enhancement(search): implement batch api

This commit is contained in:
fschade
2025-09-02 10:26:11 +02:00
parent 82e75e19c1
commit f615ccc896
8 changed files with 446 additions and 198 deletions

View File

@@ -160,48 +160,70 @@ func (b *Backend) DocCount() (uint64, error) {
}
func (b *Backend) Upsert(id string, r search.Resource) error {
return b.withBatch(func(batch search.BatchOperator) error {
return batch.Upsert(id, r)
})
}
func (b *Backend) Move(rootID, parentID, location string) error {
return b.withBatch(func(batch search.BatchOperator) error {
return batch.Move(rootID, parentID, location)
})
}
func (b *Backend) Delete(id string) error {
return b.withBatch(func(batch search.BatchOperator) error {
return batch.Delete(id)
})
}
func (b *Backend) Restore(id string) error {
return b.withBatch(func(batch search.BatchOperator) error {
return batch.Restore(id)
})
}
func (b *Backend) Purge(id string) error {
return b.withBatch(func(batch search.BatchOperator) error {
return batch.Purge(id)
})
}
func (b *Backend) NewBatch(size int) (search.BatchOperator, error) {
return NewBatch(b.index, size)
}
func (b *Backend) withBatch(f func(batch search.BatchOperator) error) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return err
}
if err := f(batch); err != nil {
if err := batch.Upsert(id, r); err != nil {
return err
}
return batch.Push()
}
func (b *Backend) Move(rootID, parentID, location string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return err
}
if err := batch.Move(rootID, parentID, location); err != nil {
return err
}
return batch.Push()
}
func (b *Backend) Delete(id string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return err
}
if err := batch.Delete(id); err != nil {
return err
}
return batch.Push()
}
func (b *Backend) Restore(id string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return err
}
if err := batch.Restore(id); err != nil {
return err
}
return batch.Push()
}
func (b *Backend) Purge(id string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return err
}
if err := batch.Purge(id); err != nil {
return err
}
return batch.Push()
}
func (b *Backend) NewBatch(size int) (search.BatchOperator, error) {
return NewBatch(b.index, size)
}

View File

@@ -1,27 +1,27 @@
package opensearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"path"
"strings"
"time"
storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"github.com/opencloud-eu/reva/v2/pkg/utils"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opencloud-eu/opencloud/pkg/conversions"
searchMessage "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
searchService "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0"
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/convert"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/osu"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
const defaultBatchSize = 50
var (
ErrUnhealthyCluster = fmt.Errorf("cluster is not healthy")
)
@@ -66,7 +66,7 @@ func NewBackend(index string, client *opensearchgoAPI.Client) (*Backend, error)
return &Backend{index: index, client: client}, nil
}
func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) {
func (b *Backend) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) {
boolQuery, err := convert.KQLToOpenSearchBoolQuery(sir.Query)
if err != nil {
return nil, fmt.Errorf("failed to convert KQL query to OpenSearch bool query: %w", err)
@@ -104,7 +104,7 @@ func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexReq
}
req, err := osu.BuildSearchReq(&opensearchgoAPI.SearchReq{
Indices: []string{be.index},
Indices: []string{b.index},
Params: searchParams,
},
boolQuery,
@@ -122,7 +122,7 @@ func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexReq
return nil, fmt.Errorf("failed to build search request: %w", err)
}
resp, err := be.client.Search(ctx, req)
resp, err := b.client.Search(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to search: %w", err)
}
@@ -155,101 +155,10 @@ func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexReq
}, nil
}
func (be *Backend) Upsert(id string, r engine.Resource) error {
body, err := json.Marshal(r)
if err != nil {
return fmt.Errorf("failed to marshal resource: %w", err)
}
_, err = be.client.Index(context.TODO(), opensearchgoAPI.IndexReq{
Index: be.index,
DocumentID: id,
Body: bytes.NewReader(body),
})
if err != nil {
return fmt.Errorf("failed to index document: %w", err)
}
return nil
}
func (be *Backend) Move(id string, parentID string, target string) error {
return be.updateSelfAndDescendants(id, func(rootResource engine.Resource) *osu.BodyParamScript {
return &osu.BodyParamScript{
Source: `
if (ctx._source.ID == params.id ) { ctx._source.Name = params.newName; ctx._source.ParentID = params.parentID; }
ctx._source.Path = ctx._source.Path.replace(params.oldPath, params.newPath)
`,
Lang: "painless",
Params: map[string]any{
"id": id,
"parentID": parentID,
"oldPath": rootResource.Path,
"newPath": utils.MakeRelativePath(target),
"newName": path.Base(utils.MakeRelativePath(target)),
},
}
})
}
func (be *Backend) Delete(id string) error {
return be.updateSelfAndDescendants(id, func(_ engine.Resource) *osu.BodyParamScript {
return &osu.BodyParamScript{
Source: "ctx._source.Deleted = params.deleted",
Lang: "painless",
Params: map[string]any{
"deleted": true,
},
}
})
}
func (be *Backend) Restore(id string) error {
return be.updateSelfAndDescendants(id, func(_ engine.Resource) *osu.BodyParamScript {
return &osu.BodyParamScript{
Source: "ctx._source.Deleted = params.deleted",
Lang: "painless",
Params: map[string]any{
"deleted": false,
},
}
})
}
func (be *Backend) Purge(id string) error {
resource, err := be.getResource(id)
if err != nil {
return fmt.Errorf("failed to get resource: %w", err)
}
req, err := osu.BuildDocumentDeleteByQueryReq(
opensearchgoAPI.DocumentDeleteByQueryReq{
Indices: []string{be.index},
Params: opensearchgoAPI.DocumentDeleteByQueryParams{
WaitForCompletion: conversions.ToPointer(true),
},
},
osu.NewTermQuery[string]("Path").Value(resource.Path),
)
if err != nil {
return fmt.Errorf("failed to build delete by query request: %w", err)
}
resp, err := be.client.Document.DeleteByQuery(context.TODO(), req)
switch {
case err != nil:
return fmt.Errorf("failed to delete by query: %w", err)
case len(resp.Failures) != 0:
return fmt.Errorf("failed to delete by query, failures: %v", resp.Failures)
}
return nil
}
func (be *Backend) DocCount() (uint64, error) {
func (b *Backend) DocCount() (uint64, error) {
req, err := osu.BuildIndicesCountReq(
&opensearchgoAPI.IndicesCountReq{
Indices: []string{be.index},
Indices: []string{b.index},
},
osu.NewTermQuery[bool]("Deleted").Value(false),
)
@@ -257,7 +166,7 @@ func (be *Backend) DocCount() (uint64, error) {
return 0, fmt.Errorf("failed to build count request: %w", err)
}
resp, err := be.client.Indices.Count(context.TODO(), req)
resp, err := b.client.Indices.Count(context.TODO(), req)
if err != nil {
return 0, fmt.Errorf("failed to count documents: %w", err)
}
@@ -265,74 +174,71 @@ func (be *Backend) DocCount() (uint64, error) {
return uint64(resp.Count), nil
}
func (be *Backend) updateSelfAndDescendants(id string, scriptProvider func(engine.Resource) *osu.BodyParamScript) error {
if scriptProvider == nil {
return fmt.Errorf("script cannot be nil")
}
resource, err := be.getResource(id)
func (b *Backend) Upsert(id string, r search.Resource) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return fmt.Errorf("failed to get resource: %w", err)
return err
}
req, err := osu.BuildUpdateByQueryReq(
opensearchgoAPI.UpdateByQueryReq{
Indices: []string{be.index},
Params: opensearchgoAPI.UpdateByQueryParams{
WaitForCompletion: conversions.ToPointer(true),
},
},
osu.NewTermQuery[string]("Path").Value(resource.Path),
osu.UpdateByQueryBodyParams{
Script: scriptProvider(resource),
},
)
if err != nil {
return fmt.Errorf("failed to build update by query request: %w", err)
if err := batch.Upsert(id, r); err != nil {
return err
}
resp, err := be.client.UpdateByQuery(context.TODO(), req)
switch {
case err != nil:
return fmt.Errorf("failed to update by query: %w", err)
case len(resp.Failures) != 0:
return fmt.Errorf("failed to update by query, failures: %v", resp.Failures)
}
return nil
return batch.Push()
}
func (be *Backend) getResource(id string) (engine.Resource, error) {
req, err := osu.BuildSearchReq(
&opensearchgoAPI.SearchReq{
Indices: []string{be.index},
},
osu.NewIDsQuery(id),
)
func (b *Backend) Move(id string, parentID string, target string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return engine.Resource{}, fmt.Errorf("failed to build search request: %w", err)
return err
}
resp, err := be.client.Search(context.TODO(), req)
switch {
case err != nil:
return engine.Resource{}, fmt.Errorf("failed to search for resource: %w", err)
case resp.Hits.Total.Value == 0 || len(resp.Hits.Hits) == 0:
return engine.Resource{}, fmt.Errorf("document with id %s not found", id)
if err := batch.Move(id, parentID, target); err != nil {
return err
}
resource, err := conversions.To[engine.Resource](resp.Hits.Hits[0].Source)
return batch.Push()
}
func (b *Backend) Delete(id string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return engine.Resource{}, fmt.Errorf("failed to convert hit source: %w", err)
return err
}
return resource, nil
if err := batch.Delete(id); err != nil {
return err
}
return batch.Push()
}
func (be *Backend) StartBatch(_ int) error {
return nil // todo: implement batch processing
func (b *Backend) Restore(id string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return err
}
if err := batch.Restore(id); err != nil {
return err
}
return batch.Push()
}
func (be *Backend) EndBatch() error {
return nil // todo: implement batch processing
func (b *Backend) Purge(id string) error {
batch, err := b.NewBatch(defaultBatchSize)
if err != nil {
return err
}
if err := batch.Purge(id); err != nil {
return err
}
return batch.Push()
}
func (b *Backend) NewBatch(size int) (search.BatchOperator, error) {
return NewBatch(b.client, b.index, size)
}

View File

@@ -10,9 +10,9 @@ import (
"github.com/stretchr/testify/require"
searchService "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0"
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
func TestNewBackend(t *testing.T) {
@@ -116,14 +116,14 @@ func TestEngine_Move(t *testing.T) {
},
})
resources := opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits)
resources := opensearchtest.SearchHitsMustBeConverted[search.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits)
require.Len(t, resources, 1)
require.Equal(t, document.Path, resources[0].Path)
document.Path = "./new/path/to/resource"
require.NoError(t, backend.Move(document.ID, document.ParentID, document.Path))
resources = opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits)
resources = opensearchtest.SearchHitsMustBeConverted[search.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits)
require.Len(t, resources, 1)
require.Equal(t, document.Path, resources[0].Path)
})

View File

@@ -0,0 +1,243 @@
package opensearch
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"strings"
"sync"
"github.com/opencloud-eu/reva/v2/pkg/utils"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opencloud-eu/opencloud/pkg/conversions"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/osu"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
var _ search.BatchOperator = &Batch{} // ensure Batch implements BatchOperator
type Batch struct {
client *opensearchgoAPI.Client
index string
size int
log log.Logger
operations []any
mu sync.Mutex
}
func NewBatch(client *opensearchgoAPI.Client, index string, size int) (*Batch, error) {
if size <= 0 {
return nil, errors.New("batch size must be greater than 0")
}
return &Batch{
client: client,
size: size,
index: index,
}, nil
}
func (b *Batch) Upsert(id string, r search.Resource) error {
return b.withSizeLimit(func() error {
body, err := conversions.To[map[string]any](r)
if err != nil {
return fmt.Errorf("failed to marshal resource: %w", err)
}
op := func() []map[string]any {
return []map[string]any{
{"index": map[string]any{"_index": b.index, "_id": id}},
body,
}
}
b.mu.Lock()
b.operations = append(b.operations, op)
b.mu.Unlock()
return nil
})
}
func (b *Batch) Move(id, parentID, location string) error {
return b.withSizeLimit(func() error {
op := func() error {
return updateSelfAndDescendants(context.Background(), b.client, b.index, id, func(rootResource search.Resource) *osu.BodyParamScript {
return &osu.BodyParamScript{
Source: `
if (ctx._source.ID == params.id ) { ctx._source.Name = params.newName; ctx._source.ParentID = params.parentID; }
ctx._source.Path = ctx._source.Path.replace(params.oldPath, params.newPath)
`,
Lang: "painless",
Params: map[string]any{
"id": id,
"parentID": parentID,
"oldPath": rootResource.Path,
"newPath": utils.MakeRelativePath(location),
"newName": path.Base(utils.MakeRelativePath(location)),
},
}
})
}
b.mu.Lock()
b.operations = append(b.operations, op)
b.mu.Unlock()
return nil
})
}
func (b *Batch) Delete(id string) error {
return b.withSizeLimit(func() error {
op := func() error {
return updateSelfAndDescendants(context.Background(), b.client, b.index, id, func(_ search.Resource) *osu.BodyParamScript {
return &osu.BodyParamScript{
Source: "ctx._source.Deleted = params.deleted",
Lang: "painless",
Params: map[string]any{
"deleted": true,
},
}
})
}
b.mu.Lock()
b.operations = append(b.operations, op)
b.mu.Unlock()
return nil
})
}
func (b *Batch) Restore(id string) error {
return b.withSizeLimit(func() error {
op := func() error {
return updateSelfAndDescendants(context.Background(), b.client, b.index, id, func(_ search.Resource) *osu.BodyParamScript {
return &osu.BodyParamScript{
Source: "ctx._source.Deleted = params.deleted",
Lang: "painless",
Params: map[string]any{
"deleted": false,
},
}
})
}
b.mu.Lock()
b.operations = append(b.operations, op)
b.mu.Unlock()
return nil
})
}
func (b *Batch) Purge(id string) error {
return b.withSizeLimit(func() error {
resource, err := searchResourceByID(context.Background(), b.client, b.index, id)
if err != nil {
return fmt.Errorf("failed to get resource: %w", err)
}
req, err := osu.BuildDocumentDeleteByQueryReq(
opensearchgoAPI.DocumentDeleteByQueryReq{
Indices: []string{b.index},
Params: opensearchgoAPI.DocumentDeleteByQueryParams{
WaitForCompletion: conversions.ToPointer(true),
},
},
osu.NewTermQuery[string]("Path").Value(resource.Path),
)
if err != nil {
return fmt.Errorf("failed to build delete by query request: %w", err)
}
op := func() error {
resp, err := b.client.Document.DeleteByQuery(context.TODO(), req)
switch {
case err != nil:
return fmt.Errorf("failed to delete by query: %w", err)
case len(resp.Failures) != 0:
return fmt.Errorf("failed to delete by query, failures: %v", resp.Failures)
}
return nil
}
b.mu.Lock()
b.operations = append(b.operations, op)
b.mu.Unlock()
return nil
})
}
func (b *Batch) Push() error {
b.mu.Lock()
defer b.mu.Unlock()
defer func() { // cleanup
b.operations = nil
}()
var bulkOperations []map[string]any
pushBulkOperations := func() error {
if len(bulkOperations) == 0 {
return nil
}
var body strings.Builder
for _, operation := range bulkOperations {
part, err := json.Marshal(operation)
if err != nil {
return fmt.Errorf("failed to marshal bulk operation: %w", err)
}
body.Write(part)
body.WriteString("\n")
}
if _, err := b.client.Bulk(context.Background(), opensearchgoAPI.BulkReq{
Body: strings.NewReader(body.String()),
}); err != nil {
return fmt.Errorf("failed to execute bulk operations: %w", err)
}
bulkOperations = nil
return nil
}
// keep the order of operations in the batch intact,
// unfortunately, operations like DeleteByQuery cannot be part of the bulk API,
// so we need to push the previous bulk operations before executing such operations
// this might lead to smaller bulks than the configured size, but ensures correct order
for _, operation := range b.operations {
switch op := operation.(type) {
case func() []map[string]any:
bulkOperations = append(bulkOperations, op()...)
case func() error:
if err := pushBulkOperations(); err != nil {
return fmt.Errorf("failed to push operations: %w", err)
}
if err := op(); err != nil {
return fmt.Errorf("failed to execute operation: %w", err)
}
}
}
return pushBulkOperations()
}
func (b *Batch) withSizeLimit(f func() error) error {
if err := f(); err != nil {
return err
}
if len(b.operations) >= b.size {
return b.Push()
}
return nil
}

View File

@@ -5,17 +5,18 @@ import (
"strings"
"time"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"github.com/opencloud-eu/opencloud/pkg/conversions"
searchMessage "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
func OpenSearchHitToMatch(hit opensearchgoAPI.SearchHit) (*searchMessage.Match, error) {
resource, err := conversions.To[engine.Resource](hit.Source)
resource, err := conversions.To[search.Resource](hit.Source)
if err != nil {
return nil, fmt.Errorf("failed to convert hit source: %w", err)
}

View File

@@ -6,7 +6,7 @@ import (
"fmt"
"path"
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
//go:embed testdata/*.json
@@ -16,12 +16,12 @@ var Testdata = struct {
Resources resourceTestdata
}{
Resources: resourceTestdata{
File: fromTestData[engine.Resource]("resource_file.json"),
File: fromTestData[search.Resource]("resource_file.json"),
},
}
type resourceTestdata struct {
File engine.Resource
File search.Resource
}
func fromTestData[D any](name string) D {

View File

@@ -0,0 +1,77 @@
package opensearch
import (
"context"
"fmt"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
"github.com/opencloud-eu/opencloud/pkg/conversions"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/osu"
)
func searchResourceByID(ctx context.Context, client *opensearchgoAPI.Client, index string, id string) (search.Resource, error) {
req, err := osu.BuildSearchReq(
&opensearchgoAPI.SearchReq{
Indices: []string{index},
},
osu.NewIDsQuery(id),
)
if err != nil {
return search.Resource{}, fmt.Errorf("failed to build search request: %w", err)
}
resp, err := client.Search(ctx, req)
switch {
case err != nil:
return search.Resource{}, fmt.Errorf("failed to search for resource: %w", err)
case resp.Hits.Total.Value == 0 || len(resp.Hits.Hits) == 0:
return search.Resource{}, fmt.Errorf("document with id %s not found", id)
}
resource, err := conversions.To[search.Resource](resp.Hits.Hits[0].Source)
if err != nil {
return search.Resource{}, fmt.Errorf("failed to convert hit source: %w", err)
}
return resource, nil
}
func updateSelfAndDescendants(ctx context.Context, client *opensearchgoAPI.Client, index string, id string, scriptProvider func(search.Resource) *osu.BodyParamScript) error {
if scriptProvider == nil {
return fmt.Errorf("script cannot be nil")
}
resource, err := searchResourceByID(context.Background(), client, index, id)
if err != nil {
return fmt.Errorf("failed to get resource: %w", err)
}
req, err := osu.BuildUpdateByQueryReq(
opensearchgoAPI.UpdateByQueryReq{
Indices: []string{index},
Params: opensearchgoAPI.UpdateByQueryParams{
WaitForCompletion: conversions.ToPointer(true),
},
},
osu.NewTermQuery[string]("Path").Value(resource.Path),
osu.UpdateByQueryBodyParams{
Script: scriptProvider(resource),
},
)
if err != nil {
return fmt.Errorf("failed to build update by query request: %w", err)
}
resp, err := client.UpdateByQuery(ctx, req)
switch {
case err != nil:
return fmt.Errorf("failed to update by query: %w", err)
case len(resp.Failures) != 0:
return fmt.Errorf("failed to update by query, failures: %v", resp.Failures)
}
return nil
}

View File

@@ -31,7 +31,6 @@ import (
"github.com/opencloud-eu/opencloud/services/search/pkg/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/content"
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"