enhancement(search): implement index manager and remove the use of index templates

This commit is contained in:
fschade
2025-08-06 14:43:58 +02:00
parent 9faa09e4c6
commit 1586f7fcbb
17 changed files with 1346 additions and 233 deletions
+2 -28
View File
@@ -39,36 +39,10 @@ func NewEngine(index string, client *opensearchgoAPI.Client) (*Engine, error) {
}
// apply the index template
if err := IndexTemplateResourceV1.Apply(context.TODO(), client); err != nil {
if err := IndexManagerLatest.Apply(context.TODO(), index, client); err != nil {
return nil, fmt.Errorf("failed to apply index template: %w", err)
}
indicesExistsResp, err := client.Indices.Exists(context.TODO(), opensearchgoAPI.IndicesExistsReq{
Indices: []string{index},
})
switch {
case indicesExistsResp != nil && indicesExistsResp.StatusCode == 404:
break
case err != nil:
return nil, fmt.Errorf("failed to check if index exists: %w", err)
case indicesExistsResp == nil:
return nil, fmt.Errorf("unexpected nil response when checking if index exists")
}
// if the index does not exist, we need to create it
if indicesExistsResp.StatusCode == 404 {
resp, err := client.Indices.Create(context.TODO(), opensearchgoAPI.IndicesCreateReq{
Index: index,
// the body is not necessary; we will use an index template to define the index settings and mappings
})
switch {
case err != nil:
return nil, fmt.Errorf("failed to create index: %w", err)
case !resp.Acknowledged:
return nil, fmt.Errorf("failed to create index: %s", index)
}
}
// first check if the cluster is healthy
_, healthy, err := clusterHealth(context.TODO(), client, []string{index})
switch {
@@ -116,7 +90,7 @@ func (e *Engine) Search(ctx context.Context, sir *searchService.SearchIndexReque
}
body, err := NewRootQuery(boolQuery, RootQueryOptions{
Highlight: RootQueryHighlight{
Highlight: &RootQueryHighlight{
PreTags: []string{"<mark>"},
PostTags: []string{"</mark>"},
Fields: map[string]RootQueryHighlight{
+63 -63
View File
@@ -23,26 +23,26 @@ func TestNewEngine(t *testing.T) {
})
require.NoError(t, err, "failed to create OpenSearch client")
engine, err := opensearch.NewEngine("test-engine-new-engine", client)
require.Nil(t, engine)
backend, err := opensearch.NewEngine("test-engine-new-engine", client)
require.Nil(t, backend)
require.ErrorIs(t, err, opensearch.ErrUnhealthyCluster)
})
}
func TestEngine_Search(t *testing.T) {
index := "opencloud-default-resource"
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
defer tc.Require.IndicesDelete([]string{index})
defer tc.Require.IndicesDelete([]string{indexName})
backend, err := opensearch.NewEngine(indexName, tc.Client())
require.NoError(t, err)
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(index, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{index}, "", 1)
backend, err := opensearch.NewEngine(index, tc.Client())
require.NoError(t, err)
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
t.Run("most simple search", func(t *testing.T) {
resp, err := backend.Search(t.Context(), &searchService.SearchIndexRequest{
@@ -59,8 +59,8 @@ func TestEngine_Search(t *testing.T) {
deletedDocument.ID = "1$2!4"
deletedDocument.Deleted = true
tc.Require.DocumentCreate(index, deletedDocument.ID, opensearchtest.JSONMustMarshal(t, deletedDocument))
tc.Require.IndicesCount([]string{index}, "", 2)
tc.Require.DocumentCreate(indexName, deletedDocument.ID, opensearchtest.JSONMustMarshal(t, deletedDocument))
tc.Require.IndicesCount([]string{indexName}, "", 2)
resp, err := backend.Search(t.Context(), &searchService.SearchIndexRequest{
Query: fmt.Sprintf(`"%s"`, document.Name),
@@ -73,43 +73,43 @@ func TestEngine_Search(t *testing.T) {
}
func TestEngine_Upsert(t *testing.T) {
index := "opencloud-default-resource"
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
defer tc.Require.IndicesDelete([]string{index})
defer tc.Require.IndicesDelete([]string{indexName})
backend, err := opensearch.NewEngine(index, tc.Client())
backend, err := opensearch.NewEngine(indexName, tc.Client())
require.NoError(t, err)
t.Run("upsert with full document", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
require.NoError(t, backend.Upsert(document.ID, document))
tc.Require.IndicesCount([]string{index}, "", 1)
tc.Require.IndicesCount([]string{indexName}, "", 1)
})
}
func TestEngine_Move(t *testing.T) {
index := "opencloud-default-resource"
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
defer tc.Require.IndicesDelete([]string{index})
defer tc.Require.IndicesDelete([]string{indexName})
backend, err := opensearch.NewEngine(index, tc.Client())
backend, err := opensearch.NewEngine(indexName, tc.Client())
require.NoError(t, err)
t.Run("moves the document to a new path", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(index, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{index}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
resources := opensearchtest.SearchHitsMustBeConverted[engine.Resource](t,
tc.Require.Search(
index,
indexName,
opensearch.NewRootQuery(
opensearch.NewIDsQuery([]string{document.ID}),
).String(),
@@ -123,7 +123,7 @@ func TestEngine_Move(t *testing.T) {
resources = opensearchtest.SearchHitsMustBeConverted[engine.Resource](t,
tc.Require.Search(
index,
indexName,
opensearch.NewRootQuery(
opensearch.NewIDsQuery([]string{document.ID}),
).String(),
@@ -135,109 +135,109 @@ func TestEngine_Move(t *testing.T) {
}
func TestEngine_Delete(t *testing.T) {
index := "opencloud-default-resource"
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
defer tc.Require.IndicesDelete([]string{index})
defer tc.Require.IndicesDelete([]string{indexName})
backend, err := opensearch.NewEngine(index, tc.Client())
backend, err := opensearch.NewEngine(indexName, tc.Client())
require.NoError(t, err)
t.Run("mark document as deleted", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(index, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{index}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.IndicesCount([]string{index}, opensearch.NewRootQuery(
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 0)
require.NoError(t, backend.Delete(document.ID))
tc.Require.IndicesCount([]string{index}, opensearch.NewRootQuery(
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 1)
})
}
func TestEngine_Restore(t *testing.T) {
index := "opencloud-default-resource"
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
defer tc.Require.IndicesDelete([]string{index})
defer tc.Require.IndicesDelete([]string{indexName})
backend, err := opensearch.NewEngine(index, tc.Client())
backend, err := opensearch.NewEngine(indexName, tc.Client())
require.NoError(t, err)
t.Run("mark document as not deleted", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
document.Deleted = true
tc.Require.DocumentCreate(index, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{index}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.IndicesCount([]string{index}, opensearch.NewRootQuery(
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 1)
require.NoError(t, backend.Restore(document.ID))
tc.Require.IndicesCount([]string{index}, opensearch.NewRootQuery(
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 0)
})
}
func TestEngine_Purge(t *testing.T) {
index := "opencloud-default-resource"
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
defer tc.Require.IndicesDelete([]string{index})
defer tc.Require.IndicesDelete([]string{indexName})
backend, err := opensearch.NewEngine(index, tc.Client())
backend, err := opensearch.NewEngine(indexName, tc.Client())
require.NoError(t, err)
t.Run("purge with full document", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(index, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{index}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
require.NoError(t, backend.Purge(document.ID))
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesCount([]string{indexName}, "", 0)
})
}
func TestEngine_DocCount(t *testing.T) {
index := "opencloud-default-resource"
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
defer tc.Require.IndicesDelete([]string{index})
defer tc.Require.IndicesDelete([]string{indexName})
backend, err := opensearch.NewEngine(index, tc.Client())
backend, err := opensearch.NewEngine(indexName, tc.Client())
require.NoError(t, err)
t.Run("ignore deleted documents", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(index, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{index}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
count, err := backend.DocCount()
require.NoError(t, err)
require.Equal(t, uint64(1), count)
tc.Require.Update(index, document.ID, opensearchtest.JSONMustMarshal(t, map[string]any{
tc.Require.Update(indexName, document.ID, opensearchtest.JSONMustMarshal(t, map[string]any{
"doc": map[string]any{
"Deleted": true,
},
}))
tc.Require.IndicesCount([]string{index}, "", 1)
tc.Require.IndicesCount([]string{indexName}, "", 1)
count, err = backend.DocCount()
require.NoError(t, err)
@@ -0,0 +1,33 @@
{
"settings": {
"number_of_shards": "1",
"number_of_replicas": "1"
},
"mappings": {
"properties": {
"ID": {
"type": "keyword"
},
"ParentID": {
"type": "keyword"
},
"RootID": {
"type": "keyword"
},
"MimeType": {
"type": "wildcard",
"doc_values": false
},
"Path": {
"type": "wildcard",
"doc_values": false
},
"Deleted": {
"type": "boolean"
},
"Hidden": {
"type": "boolean"
}
}
}
}
@@ -1,40 +0,0 @@
{
"index_patterns": [
"opencloud-default-*"
],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"ID": {
"type": "keyword"
},
"ParentID": {
"type": "keyword"
},
"RootID": {
"type": "keyword"
},
"MimeType": {
"type": "wildcard"
},
"Path": {
"type": "wildcard"
},
"Deleted": {
"type": "boolean"
},
"Hidden": {
"type": "boolean"
}
}
}
},
"version": 1,
"_meta": {
"description": "using component templates"
}
}
@@ -50,7 +50,7 @@ func (q *RootQuery) String() string {
}
type RootQueryOptions struct {
Highlight RootQueryHighlight `json:"highlight,omitempty"`
Highlight *RootQueryHighlight `json:"highlight,omitempty"`
}
type RootQueryHighlight struct {
@@ -29,7 +29,7 @@ func TestRootQuery(t *testing.T) {
Got: opensearch.NewRootQuery(
opensearch.NewTermQuery[string]("content").Value("content"),
opensearch.RootQueryOptions{
Highlight: opensearch.RootQueryHighlight{
Highlight: &opensearch.RootQueryHighlight{
PreTags: []string{"<b>"},
PostTags: []string{"</b>"},
Fields: map[string]opensearch.RootQueryHighlight{
+142
View File
@@ -0,0 +1,142 @@
package opensearch
import (
"bytes"
"context"
"embed"
"errors"
"fmt"
"path"
"reflect"
"github.com/go-jose/go-jose/v3/json"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/tidwall/gjson"
)
var (
ErrManualActionRequired = errors.New("manual action required")
IndexManagerLatest = IndexIndexManagerResourceV1
IndexIndexManagerResourceV1 IndexManager = "resource_v1.json"
)
//go:embed internal/indexes/*.json
var indexes embed.FS
type IndexManager string
func (m IndexManager) String() string {
b, err := m.MarshalJSON()
if err != nil {
return ""
}
return string(b)
}
func (m IndexManager) MarshalJSON() ([]byte, error) {
filePath := string(m)
body, err := indexes.ReadFile(path.Join("./internal/indexes", filePath))
switch {
case err != nil:
return nil, fmt.Errorf("failed to read index file %s: %w", filePath, err)
case len(body) <= 0:
return nil, fmt.Errorf("index file %s is empty", filePath)
}
return body, nil
}
func (m IndexManager) Apply(ctx context.Context, name string, client *opensearchgoAPI.Client) error {
localIndexB, err := m.MarshalJSON()
if err != nil {
return fmt.Errorf("failed to marshal index %s: %w", name, err)
}
indicesExistsResp, err := client.Indices.Exists(ctx, opensearchgoAPI.IndicesExistsReq{
Indices: []string{name},
})
switch {
case indicesExistsResp != nil && indicesExistsResp.StatusCode == 404:
break
case err != nil:
return fmt.Errorf("failed to check if index %s exists: %w", name, err)
case indicesExistsResp == nil:
return fmt.Errorf("indicesExistsResp is nil for index %s", name)
}
if indicesExistsResp.StatusCode == 200 {
resp, err := client.Indices.Get(ctx, opensearchgoAPI.IndicesGetReq{
Indices: []string{name},
})
if err != nil {
return fmt.Errorf("failed to get index %s: %w", name, err)
}
remoteIndex, ok := resp.Indices[name]
if !ok {
return fmt.Errorf("index %s not found in response", name)
}
remoteIndexB, err := json.Marshal(remoteIndex)
if err != nil {
return fmt.Errorf("failed to marshal index %s: %w", name, err)
}
localIndexJson := gjson.ParseBytes(localIndexB)
remoteIndexJson := gjson.ParseBytes(remoteIndexB)
compare := func(lvPath, rvPath string) (any, any, bool) {
lv := localIndexJson.Get(lvPath).Raw
rv := remoteIndexJson.Get(rvPath).Raw
var lvv, rvv interface{}
if err := json.Unmarshal([]byte(lv), &lvv); err != nil {
return nil, nil, false
}
if err := json.Unmarshal([]byte(rv), &rvv); err != nil {
return nil, nil, false
}
return lv, rv, reflect.DeepEqual(lvv, rvv)
}
var errs []error
for k := range localIndexJson.Get("settings").Map() {
if lv, rv, ok := compare("settings."+k, "settings.index."+k); !ok {
errs = append(errs, fmt.Errorf("settings.%s local %s, remote %s", k, lv, rv))
}
}
for k := range localIndexJson.Get("mappings.properties").Map() {
if _, _, ok := compare("mappings.properties."+k, "mappings.properties."+k); !ok {
errs = append(errs, fmt.Errorf("mappings.properties.%s", k))
}
}
if errs != nil {
return fmt.Errorf(
"index %s allready exists and is different from the requested version, %w: %w",
name,
ErrManualActionRequired,
errors.Join(errs...),
)
}
return nil // Index is already up to date, no action needed
}
createResp, err := client.Indices.Create(ctx, opensearchgoAPI.IndicesCreateReq{
Index: name,
Body: bytes.NewReader(localIndexB),
})
switch {
case err != nil:
return fmt.Errorf("failed to create index %s: %w", name, err)
case !createResp.Acknowledged:
return fmt.Errorf("failed to create index %s: not acknowledged", name)
}
return nil
}
@@ -1,65 +0,0 @@
package opensearch
import (
"bytes"
"context"
"embed"
"fmt"
"path"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)
var (
IndexTemplateResourceV1 IndexTemplate = [2]string{"opencloud-default-resource", "resource_v1.json"}
)
//go:embed internal/indices/*.json
var indexTemplates embed.FS
type IndexTemplate [2]string
func (t IndexTemplate) Name() string {
return t[0]
}
func (t IndexTemplate) String() string {
b, err := t.MarshalJSON()
if err != nil {
return ""
}
return string(b)
}
func (t IndexTemplate) MarshalJSON() ([]byte, error) {
file := t[1]
body, err := indexTemplates.ReadFile(path.Join("./internal/indices", file))
switch {
case err != nil:
return nil, fmt.Errorf("failed to read index template file %s: %w", file, err)
case len(body) <= 0:
return nil, fmt.Errorf("index template file %s is empty", file)
}
return body, nil
}
func (t IndexTemplate) Apply(ctx context.Context, client *opensearchgoAPI.Client) error {
body, err := t.MarshalJSON()
if err != nil {
return fmt.Errorf("failed to inspect index template %s: %w", t[1], err)
}
resp, err := client.IndexTemplate.Create(ctx, opensearchgoAPI.IndexTemplateCreateReq{
IndexTemplate: t.Name(),
Body: bytes.NewBuffer(body),
})
switch {
case err != nil:
return fmt.Errorf("failed to create index template %s: %w", t.Name(), err)
case !resp.Acknowledged:
return fmt.Errorf("failed to create index template %s: not acknowledged", t.Name())
default:
return nil
}
}
@@ -1,34 +0,0 @@
package opensearch_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
opensearchtest "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test"
)
func TestIndexTemplates(t *testing.T) {
tc := opensearchtest.NewDefaultTestClient(t)
t.Run("index templates plausibility", func(t *testing.T) {
tests := []opensearchtest.TableTest[opensearch.IndexTemplate, struct{}]{
{
Name: "empty",
Got: opensearch.IndexTemplateResourceV1,
},
}
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
body, err := test.Got.MarshalJSON()
require.NoError(t, err)
require.NotEmpty(t, body)
require.NotEmpty(t, test.Got.String())
require.JSONEq(t, test.Got.String(), string(body))
require.NotEmpty(t, test.Got.Name())
require.NoError(t, test.Got.Apply(t.Context(), tc.Client()))
})
}
})
}
@@ -0,0 +1,62 @@
package opensearch_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/tidwall/sjson"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
opensearchtest "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test"
)
func TestIndex(t *testing.T) {
t.Run("index plausibility", func(t *testing.T) {
tests := []opensearchtest.TableTest[opensearch.IndexManager, struct{}]{
{
Name: "empty",
Got: opensearch.IndexManagerLatest,
},
}
tc := opensearchtest.NewDefaultTestClient(t)
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
indexName := "opencloud-test-resource"
tc.Require.IndicesReset([]string{indexName})
body, err := test.Got.MarshalJSON()
require.NoError(t, err)
require.NotEmpty(t, body)
require.NotEmpty(t, test.Got.String())
require.JSONEq(t, test.Got.String(), string(body))
require.NoError(t, test.Got.Apply(t.Context(), indexName, tc.Client()))
})
}
})
t.Run("does not create index if it already exists and is up to date", func(t *testing.T) {
indexManager := opensearch.IndexManagerLatest
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCreate(indexName, indexManager.String())
require.NoError(t, indexManager.Apply(t.Context(), indexName, tc.Client()))
})
t.Run("fails to create index if it already exists but is not up to date", func(t *testing.T) {
indexManager := opensearch.IndexManagerLatest
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
body, err := sjson.Set(indexManager.String(), "settings.number_of_shards", "2")
require.NoError(t, err)
tc.Require.IndicesCreate(indexName, body)
require.ErrorIs(t, indexManager.Apply(t.Context(), indexName, tc.Client()), opensearch.ErrManualActionRequired)
})
}