From c18bfad22298bd25d63731aabb5d7ce80cfbe267 Mon Sep 17 00:00:00 2001 From: fschade Date: Tue, 29 Jul 2025 12:49:13 +0200 Subject: [PATCH] enhancement(search): implement engine purge --- services/search/pkg/opensearch/engine.go | 2 +- services/search/pkg/opensearch/engine_test.go | 10 +-- .../search/pkg/opensearch/fixtures_test.go | 1 - .../search/pkg/opensearch/internal/test/os.go | 66 +++++++++++++++---- .../search/pkg/opensearch/opensearch_test.go | 4 +- 5 files changed, 60 insertions(+), 23 deletions(-) delete mode 100644 services/search/pkg/opensearch/fixtures_test.go diff --git a/services/search/pkg/opensearch/engine.go b/services/search/pkg/opensearch/engine.go index cf8b01e12..f92ca81a9 100644 --- a/services/search/pkg/opensearch/engine.go +++ b/services/search/pkg/opensearch/engine.go @@ -61,7 +61,7 @@ func (e *Engine) Purge(id string) error { DocumentID: id, }) if err != nil { - return fmt.Errorf("failed to index document: %w", err) + return fmt.Errorf("failed to purge document: %w", err) } return nil diff --git a/services/search/pkg/opensearch/engine_test.go b/services/search/pkg/opensearch/engine_test.go index 2185fc543..24b6e5513 100644 --- a/services/search/pkg/opensearch/engine_test.go +++ b/services/search/pkg/opensearch/engine_test.go @@ -15,6 +15,8 @@ func TestEngine_Upsert(t *testing.T) { tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, 0) + defer tc.Require.IndicesDelete([]string{index}) + engine, err := opensearch.NewEngine(index, tc.Client()) assert.NoError(t, err) @@ -23,7 +25,6 @@ func TestEngine_Upsert(t *testing.T) { assert.NoError(t, engine.Upsert(document.ID, document)) tc.Require.IndicesCount([]string{index}, 1) - tc.Require.IndicesDelete([]string{index}) }) } @@ -39,20 +40,19 @@ func TestEngine_Purge(t *testing.T) { tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, 0) + defer tc.Require.IndicesDelete([]string{index}) + engine, err := opensearch.NewEngine(index, tc.Client()) assert.NoError(t, err) t.Run("Purge with full document", func(t *testing.T) { document := ostest.Testdata.Resources.Full - assert.NoError(t, engine.Upsert(document.ID, document)) - + tc.Require.DocumentCreate(index, document.ID, toJSON(t, document)) tc.Require.IndicesCount([]string{index}, 1) assert.NoError(t, engine.Purge(document.ID)) tc.Require.IndicesCount([]string{index}, 0) - - tc.Require.IndicesDelete([]string{index}) }) } diff --git a/services/search/pkg/opensearch/fixtures_test.go b/services/search/pkg/opensearch/fixtures_test.go deleted file mode 100644 index 5cfac2a0d..000000000 --- a/services/search/pkg/opensearch/fixtures_test.go +++ /dev/null @@ -1 +0,0 @@ -package opensearch_test diff --git a/services/search/pkg/opensearch/internal/test/os.go b/services/search/pkg/opensearch/internal/test/os.go index 8d322119c..6a0058baa 100644 --- a/services/search/pkg/opensearch/internal/test/os.go +++ b/services/search/pkg/opensearch/internal/test/os.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "slices" "strings" "testing" @@ -38,7 +39,12 @@ func (tc *TestClient) Client() *opensearchgoAPI.Client { func (tc *TestClient) IndicesReset(ctx context.Context, indices []string) error { indicesToDelete := make([]string, 0, len(indices)) for _, index := range indices { - if err := tc.IndicesRefresh(ctx, indices); err != nil { + exist, err := tc.IndicesExists(ctx, []string{index}) + if err != nil { + return fmt.Errorf("failed to check if index %s exists: %w", index, err) + } + + if !exist { continue } @@ -54,7 +60,7 @@ func (tc *TestClient) IndicesReset(ctx context.Context, indices []string) error } func (tc *TestClient) IndicesExists(ctx context.Context, indices []string) (bool, error) { - if err := tc.IndicesRefresh(ctx, indices); err != nil { + if err := tc.IndicesRefresh(ctx, indices, []int{404}); err != nil { return false, err } @@ -62,25 +68,31 @@ func (tc *TestClient) IndicesExists(ctx context.Context, indices []string) (bool Indices: indices, }) switch { + case resp != nil && resp.StatusCode == 404: + return false, nil case err != nil: - return false, err - case resp.IsError(): + return false, fmt.Errorf("failed to check if indices exist: %w", err) + case resp != nil && resp.IsError(): return false, fmt.Errorf("failed to check if indices exist: %s", resp.String()) default: return true, nil } } -func (tc *TestClient) IndicesRefresh(ctx context.Context, indices []string) error { - _, err := tc.c.Indices.Refresh(ctx, &opensearchgoAPI.IndicesRefreshReq{ +func (tc *TestClient) IndicesRefresh(ctx context.Context, indices []string, allow []int) error { + resp, err := tc.c.Indices.Refresh(ctx, &opensearchgoAPI.IndicesRefreshReq{ Indices: indices, }) - return err + if err != nil && !(resp != nil && slices.Contains(allow, resp.Inspect().Response.StatusCode)) { + return fmt.Errorf("failed to refresh indices %v: %w", indices, err) + } + + return nil } func (tc *TestClient) IndicesDelete(ctx context.Context, indices []string) error { - if err := tc.IndicesRefresh(ctx, indices); err != nil { + if err := tc.IndicesRefresh(ctx, indices, []int{404}); err != nil { return err } @@ -98,7 +110,7 @@ func (tc *TestClient) IndicesDelete(ctx context.Context, indices []string) error } func (tc *TestClient) IndicesCount(ctx context.Context, indices []string) (int, error) { - if err := tc.IndicesRefresh(ctx, indices); err != nil { + if err := tc.IndicesRefresh(ctx, indices, []int{404}); err != nil { return 0, err } @@ -115,6 +127,10 @@ func (tc *TestClient) IndicesCount(ctx context.Context, indices []string) (int, } func (tc *TestClient) IndexCreate(ctx context.Context, index string, body string) error { + if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil { + return err + } + resp, err := tc.c.Indices.Create(ctx, opensearchgoAPI.IndicesCreateReq{ Index: index, Body: strings.NewReader(body), @@ -130,13 +146,31 @@ func (tc *TestClient) IndexCreate(ctx context.Context, index string, body string } } +func (tc *TestClient) DocumentCreate(ctx context.Context, index string, id, body string) error { + if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil { + return err + } + + _, err := tc.c.Document.Create(ctx, opensearchgoAPI.DocumentCreateReq{ + Index: index, + DocumentID: id, + Body: strings.NewReader(body), + }) + switch { + case err != nil: + return fmt.Errorf("failed to create document in index %s: %w", index, err) + default: + return nil + } +} + type testRequireClient struct { tc *TestClient t *testing.T } func (trc *testRequireClient) IndicesReset(indices []string) { - require.NoError(trc.t, trc.tc.IndicesReset(trc.t.Context(), indices), "Failed to reset indices") + require.NoError(trc.t, trc.tc.IndicesReset(trc.t.Context(), indices)) } func (trc *testRequireClient) IndicesExists(indices []string, expected bool) { @@ -151,12 +185,12 @@ func (trc *testRequireClient) IndicesExists(indices []string, expected bool) { } } -func (trc *testRequireClient) IndicesRefresh(indices []string) { - require.NoError(trc.t, trc.tc.IndicesRefresh(trc.t.Context(), indices), "Failed to refresh indices") +func (trc *testRequireClient) IndicesRefresh(indices []string, ignore []int) { + require.NoError(trc.t, trc.tc.IndicesRefresh(trc.t.Context(), indices, ignore)) } func (trc *testRequireClient) IndicesDelete(indices []string) { - require.NoError(trc.t, trc.tc.IndicesDelete(trc.t.Context(), indices), "Failed to delete indices") + require.NoError(trc.t, trc.tc.IndicesDelete(trc.t.Context(), indices)) } func (trc *testRequireClient) IndicesCount(indices []string, expected int) { @@ -172,5 +206,9 @@ func (trc *testRequireClient) IndicesCount(indices []string, expected int) { } func (trc *testRequireClient) IndexCreate(index string, body string) { - require.NoError(trc.t, trc.tc.IndexCreate(trc.t.Context(), index, body), "Failed to create index %s", index) + require.NoError(trc.t, trc.tc.IndexCreate(trc.t.Context(), index, body)) +} + +func (trc *testRequireClient) DocumentCreate(index string, id, body string) { + require.NoError(trc.t, trc.tc.DocumentCreate(trc.t.Context(), index, id, body)) } diff --git a/services/search/pkg/opensearch/opensearch_test.go b/services/search/pkg/opensearch/opensearch_test.go index 79804ba75..db18b4809 100644 --- a/services/search/pkg/opensearch/opensearch_test.go +++ b/services/search/pkg/opensearch/opensearch_test.go @@ -4,7 +4,7 @@ import ( "encoding/json" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type tableTest[G any, W any] struct { @@ -15,6 +15,6 @@ type tableTest[G any, W any] struct { func toJSON(t *testing.T, data any) string { jsonData, err := json.Marshal(data) - assert.NoError(t, err, "failed to marshal data to JSON") + require.NoError(t, err, "failed to marshal data to JSON") return string(jsonData) }