From 1104846219159d4c4c3bf4bc8e374e0590516c8d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 14 Nov 2024 06:35:46 +0000 Subject: [PATCH] chore(deps): bump github.com/blevesearch/bleve/v2 from 2.4.2 to 2.4.3 Bumps [github.com/blevesearch/bleve/v2](https://github.com/blevesearch/bleve) from 2.4.2 to 2.4.3. - [Release notes](https://github.com/blevesearch/bleve/releases) - [Commits](https://github.com/blevesearch/bleve/compare/v2.4.2...v2.4.3) --- updated-dependencies: - dependency-name: github.com/blevesearch/bleve/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 12 +- go.sum | 24 +- .../github.com/blevesearch/bleve/v2/README.md | 27 +- .../github.com/blevesearch/bleve/v2/index.go | 3 + .../bleve/v2/index/scorch/event.go | 4 + .../bleve/v2/index/scorch/optimize_knn.go | 44 ++- .../bleve/v2/index/scorch/scorch.go | 12 +- .../bleve/v2/index/scorch/snapshot_index.go | 30 +- .../v2/index/scorch/snapshot_index_vr.go | 38 ++- .../v2/index/scorch/snapshot_vector_index.go | 26 ++ .../blevesearch/bleve/v2/index_impl.go | 15 + .../bleve/v2/mapping/mapping_vectors.go | 18 ++ .../bleve/v2/search/collector/eligible.go | 157 ++++++++++ .../bleve/v2/search/collector/slice.go | 4 +- .../bleve/v2/search/query/boolean.go | 9 + .../blevesearch/bleve/v2/search/query/knn.go | 20 +- .../bleve/v2/search/searcher/search_knn.go | 13 +- .../blevesearch/bleve/v2/search_knn.go | 121 +++++++- .../blevesearch/bleve_index_api/index.go | 8 + .../blevesearch/bleve_index_api/vector.go | 10 +- .../bleve_index_api/vector_index.go | 7 +- .../github.com/blevesearch/go-faiss/faiss.go | 11 + .../github.com/blevesearch/go-faiss/index.go | 134 ++++++++- .../blevesearch/go-faiss/index_ivf.go | 8 + .../blevesearch/go-faiss/search_params.go | 97 ++++++- .../blevesearch/go-faiss/selector.go | 30 +- .../scorch_segment_api/v2/segment_vector.go | 6 +- .../github.com/blevesearch/zapx/v15/chunk.go | 17 ++ .../blevesearch/zapx/v15/contentcoder.go | 7 +- .../blevesearch/zapx/v15/docvalues.go | 1 - .../blevesearch/zapx/v15/intDecoder.go | 1 - .../blevesearch/zapx/v15/intcoder.go | 6 +- .../github.com/blevesearch/zapx/v15/merge.go | 4 + vendor/github.com/blevesearch/zapx/v15/new.go | 11 +- .../blevesearch/zapx/v15/posting.go | 13 +- .../github.com/blevesearch/zapx/v16/build.go | 12 +- .../github.com/blevesearch/zapx/v16/chunk.go | 17 ++ .../blevesearch/zapx/v16/contentcoder.go | 7 +- .../blevesearch/zapx/v16/docvalues.go | 1 - .../zapx/v16/faiss_vector_cache.go | 141 ++++++--- .../zapx/v16/faiss_vector_posting.go | 269 ++++++++++++++++-- .../blevesearch/zapx/v16/intDecoder.go | 1 - .../blevesearch/zapx/v16/intcoder.go | 6 +- .../github.com/blevesearch/zapx/v16/merge.go | 26 +- vendor/github.com/blevesearch/zapx/v16/new.go | 33 +-- .../blevesearch/zapx/v16/posting.go | 13 +- .../zapx/v16/section_faiss_vector_index.go | 72 +++-- .../zapx/v16/section_inverted_text_index.go | 11 +- .../blevesearch/zapx/v16/segment.go | 13 +- .../github.com/blevesearch/zapx/v16/write.go | 2 +- vendor/modules.txt | 18 +- 51 files changed, 1321 insertions(+), 269 deletions(-) create mode 100644 vendor/github.com/blevesearch/bleve/v2/search/collector/eligible.go diff --git a/go.mod b/go.mod index a05c3f4ba3..acab018c40 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/Nerzal/gocloak/v13 v13.9.0 github.com/bbalet/stopwords v1.0.0 github.com/beevik/etree v1.4.1 - github.com/blevesearch/bleve/v2 v2.4.2 + github.com/blevesearch/bleve/v2 v2.4.3 github.com/cenkalti/backoff v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.11.0 github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1 @@ -135,13 +135,13 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bitly/go-simplejson v0.5.0 // indirect github.com/bits-and-blooms/bitset v1.12.0 // indirect - github.com/blevesearch/bleve_index_api v1.1.10 // indirect + github.com/blevesearch/bleve_index_api v1.1.12 // indirect github.com/blevesearch/geo v0.1.20 // indirect - github.com/blevesearch/go-faiss v1.0.20 // indirect + github.com/blevesearch/go-faiss v1.0.23 // indirect github.com/blevesearch/go-porterstemmer v1.0.3 // indirect github.com/blevesearch/gtreap v0.1.1 // indirect github.com/blevesearch/mmap-go v1.0.4 // indirect - github.com/blevesearch/scorch_segment_api/v2 v2.2.15 // indirect + github.com/blevesearch/scorch_segment_api/v2 v2.2.16 // indirect github.com/blevesearch/segment v0.9.1 // indirect github.com/blevesearch/snowballstem v0.9.0 // indirect github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect @@ -150,8 +150,8 @@ require ( github.com/blevesearch/zapx/v12 v12.3.10 // indirect github.com/blevesearch/zapx/v13 v13.3.10 // indirect github.com/blevesearch/zapx/v14 v14.3.10 // indirect - github.com/blevesearch/zapx/v15 v15.3.13 // indirect - github.com/blevesearch/zapx/v16 v16.1.5 // indirect + github.com/blevesearch/zapx/v15 v15.3.16 // indirect + github.com/blevesearch/zapx/v16 v16.1.8 // indirect github.com/bluele/gcache v0.0.2 // indirect github.com/bombsimon/logrusr/v3 v3.1.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect diff --git a/go.sum b/go.sum index c975e353ae..d31afbccde 100644 --- a/go.sum +++ b/go.sum @@ -149,22 +149,22 @@ github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngE github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= -github.com/blevesearch/bleve/v2 v2.4.2 h1:NooYP1mb3c0StkiY9/xviiq2LGSaE8BQBCc/pirMx0U= -github.com/blevesearch/bleve/v2 v2.4.2/go.mod h1:ATNKj7Yl2oJv/lGuF4kx39bST2dveX6w0th2FFYLkc8= -github.com/blevesearch/bleve_index_api v1.1.10 h1:PDLFhVjrjQWr6jCuU7TwlmByQVCSEURADHdCqVS9+g0= -github.com/blevesearch/bleve_index_api v1.1.10/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8= +github.com/blevesearch/bleve/v2 v2.4.3 h1:XDYj+1prgX84L2Cf+V3ojrOPqXxy0qxyd2uLMmeuD+4= +github.com/blevesearch/bleve/v2 v2.4.3/go.mod h1:hEPDPrbYw3vyrm5VOa36GyS4bHWuIf4Fflp7460QQXY= +github.com/blevesearch/bleve_index_api v1.1.12 h1:P4bw9/G/5rulOF7SJ9l4FsDoo7UFJ+5kexNy1RXfegY= +github.com/blevesearch/bleve_index_api v1.1.12/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8= github.com/blevesearch/geo v0.1.20 h1:paaSpu2Ewh/tn5DKn/FB5SzvH0EWupxHEIwbCk/QPqM= github.com/blevesearch/geo v0.1.20/go.mod h1:DVG2QjwHNMFmjo+ZgzrIq2sfCh6rIHzy9d9d0B59I6w= -github.com/blevesearch/go-faiss v1.0.20 h1:AIkdTQFWuZ5LQmKQSebgMR4RynGNw8ZseJXaan5kvtI= -github.com/blevesearch/go-faiss v1.0.20/go.mod h1:jrxHrbl42X/RnDPI+wBoZU8joxxuRwedrxqswQ3xfU8= +github.com/blevesearch/go-faiss v1.0.23 h1:Wmc5AFwDLKGl2L6mjLX1Da3vCL0EKa2uHHSorcIS1Uc= +github.com/blevesearch/go-faiss v1.0.23/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo= github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M= github.com/blevesearch/gtreap v0.1.1 h1:2JWigFrzDMR+42WGIN/V2p0cUvn4UP3C4Q5nmaZGW8Y= github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgYICSZ3w0tYk= github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= -github.com/blevesearch/scorch_segment_api/v2 v2.2.15 h1:prV17iU/o+A8FiZi9MXmqbagd8I0bCqM7OKUYPbnb5Y= -github.com/blevesearch/scorch_segment_api/v2 v2.2.15/go.mod h1:db0cmP03bPNadXrCDuVkKLV6ywFSiRgPFT1YVrestBc= +github.com/blevesearch/scorch_segment_api/v2 v2.2.16 h1:uGvKVvG7zvSxCwcm4/ehBa9cCEuZVE+/zvrSl57QUVY= +github.com/blevesearch/scorch_segment_api/v2 v2.2.16/go.mod h1:VF5oHVbIFTu+znY1v30GjSpT5+9YFs9dV2hjvuh34F0= github.com/blevesearch/segment v0.9.1 h1:+dThDy+Lvgj5JMxhmOVlgFfkUtZV2kw49xax4+jTfSU= github.com/blevesearch/segment v0.9.1/go.mod h1:zN21iLm7+GnBHWTao9I+Au/7MBiL8pPFtJBJTsk6kQw= github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s= @@ -181,10 +181,10 @@ github.com/blevesearch/zapx/v13 v13.3.10 h1:0KY9tuxg06rXxOZHg3DwPJBjniSlqEgVpxIq github.com/blevesearch/zapx/v13 v13.3.10/go.mod h1:w2wjSDQ/WBVeEIvP0fvMJZAzDwqwIEzVPnCPrz93yAk= github.com/blevesearch/zapx/v14 v14.3.10 h1:SG6xlsL+W6YjhX5N3aEiL/2tcWh3DO75Bnz77pSwwKU= github.com/blevesearch/zapx/v14 v14.3.10/go.mod h1:qqyuR0u230jN1yMmE4FIAuCxmahRQEOehF78m6oTgns= -github.com/blevesearch/zapx/v15 v15.3.13 h1:6EkfaZiPlAxqXz0neniq35my6S48QI94W/wyhnpDHHQ= -github.com/blevesearch/zapx/v15 v15.3.13/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg= -github.com/blevesearch/zapx/v16 v16.1.5 h1:b0sMcarqNFxuXvjoXsF8WtwVahnxyhEvBSRJi/AUHjU= -github.com/blevesearch/zapx/v16 v16.1.5/go.mod h1:J4mSF39w1QELc11EWRSBFkPeZuO7r/NPKkHzDCoiaI8= +github.com/blevesearch/zapx/v15 v15.3.16 h1:Ct3rv7FUJPfPk99TI/OofdC+Kpb4IdyfdMH48sb+FmE= +github.com/blevesearch/zapx/v15 v15.3.16/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg= +github.com/blevesearch/zapx/v16 v16.1.8 h1:Bxzpw6YQpFs7UjoCV1+RvDw6fmAT2GZxldwX8b3wVBM= +github.com/blevesearch/zapx/v16 v16.1.8/go.mod h1:JqQlOqlRVaYDkpLIl3JnKql8u4zKTNlVEa3nLsi0Gn8= github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw= github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= diff --git a/vendor/github.com/blevesearch/bleve/v2/README.md b/vendor/github.com/blevesearch/bleve/v2/README.md index 80499ec537..fa75ef3db1 100644 --- a/vendor/github.com/blevesearch/bleve/v2/README.md +++ b/vendor/github.com/blevesearch/bleve/v2/README.md @@ -9,28 +9,29 @@ [![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -A modern indexing library in GO +A modern indexing + search library in GO ## Features -* Index any go data structure (including JSON) -* Intelligent defaults backed up by powerful configuration +* Index any GO data structure or JSON +* Intelligent defaults backed up by powerful configuration ([scorch](https://github.com/blevesearch/bleve/blob/master/index/scorch/README.md)) * Supported field types: * `text`, `number`, `datetime`, `boolean`, `geopoint`, `geoshape`, `IP`, `vector` * Supported query types: - * Term, Phrase, Match, Match Phrase, Prefix, Fuzzy - * Conjunction, Disjunction, Boolean (`must`/`should`/`must_not`) - * Term Range, Numeric Range, Date Range - * [Geo Spatial](https://github.com/blevesearch/bleve/blob/master/geo/README.md) - * Simple [query string syntax](http://www.blevesearch.com/docs/Query-String-Query/) - * Approximate k-nearest neighbors over [vectors](https://github.com/blevesearch/bleve/blob/master/docs/vectors.md) -* [tf-idf](https://en.wikipedia.org/wiki/Tf-idf) Scoring + * `term`, `phrase`, `match`, `match_phrase`, `prefix`, `regexp`, `wildcard`, `fuzzy` + * term range, numeric range, date range, boolean field + * compound queries: `conjuncts`, `disjuncts`, boolean (`must`/`should`/`must_not`) + * [query string syntax](http://www.blevesearch.com/docs/Query-String-Query/) + * [geo spatial search](https://github.com/blevesearch/bleve/blob/master/geo/README.md) + * approximate k-nearest neighbors via [vector search](https://github.com/blevesearch/bleve/blob/master/docs/vectors.md) +* [tf-idf](https://en.wikipedia.org/wiki/Tf-idf) scoring +* Hybrid search: exact + semantic * Query time boosting * Search result match highlighting with document fragments * Aggregations/faceting support: - * Terms Facet - * Numeric Range Facet - * Date Range Facet + * terms facet + * numeric range facet + * date range facet ## Indexing diff --git a/vendor/github.com/blevesearch/bleve/v2/index.go b/vendor/github.com/blevesearch/bleve/v2/index.go index 7d4c9be9be..acbefc695d 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index.go +++ b/vendor/github.com/blevesearch/bleve/v2/index.go @@ -46,6 +46,9 @@ func (b *Batch) Index(id string, data interface{}) error { if id == "" { return ErrorEmptyID } + if eventIndex, ok := b.index.(index.EventIndex); ok { + eventIndex.FireIndexEvent() + } doc := document.NewDocument(id) err := b.index.Mapping().MapDocument(doc, data) if err != nil { diff --git a/vendor/github.com/blevesearch/bleve/v2/index/scorch/event.go b/vendor/github.com/blevesearch/bleve/v2/index/scorch/event.go index 0f653ccf49..e0bc3b60ac 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index/scorch/event.go +++ b/vendor/github.com/blevesearch/bleve/v2/index/scorch/event.go @@ -67,3 +67,7 @@ var EventKindMergeTaskIntroduction = EventKind(8) // EventKindPreMergeCheck is fired before the merge begins to check if // the caller should proceed with the merge. var EventKindPreMergeCheck = EventKind(9) + +// EventKindIndexStart is fired when Index() is invoked which +// creates a new Document object from an interface using the index mapping. +var EventKindIndexStart = EventKind(10) diff --git a/vendor/github.com/blevesearch/bleve/v2/index/scorch/optimize_knn.go b/vendor/github.com/blevesearch/bleve/v2/index/scorch/optimize_knn.go index 6b10a207cf..ca179574c7 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index/scorch/optimize_knn.go +++ b/vendor/github.com/blevesearch/bleve/v2/index/scorch/optimize_knn.go @@ -23,6 +23,7 @@ import ( "sync" "sync/atomic" + "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/v2/search" index "github.com/blevesearch/bleve_index_api" segment_api "github.com/blevesearch/scorch_segment_api/v2" @@ -34,6 +35,8 @@ type OptimizeVR struct { totalCost uint64 // maps field to vector readers vrs map[string][]*IndexSnapshotVectorReader + // if at least one of the vector readers requires filtered kNN. + requiresFiltering bool } // This setting _MUST_ only be changed during init and not after. @@ -62,6 +65,11 @@ func (o *OptimizeVR) Finish() error { var errorsM sync.Mutex var errors []error + var snapshotGlobalDocNums map[int]*roaring.Bitmap + if o.requiresFiltering { + snapshotGlobalDocNums = o.snapshot.globalDocNums() + } + defer o.invokeSearcherEndCallback() wg := sync.WaitGroup{} @@ -77,7 +85,8 @@ func (o *OptimizeVR) Finish() error { wg.Done() }() for field, vrs := range o.vrs { - vecIndex, err := segment.InterpretVectorIndex(field, origSeg.deleted) + vecIndex, err := segment.InterpretVectorIndex(field, + o.requiresFiltering, origSeg.deleted) if err != nil { errorsM.Lock() errors = append(errors, err) @@ -89,9 +98,37 @@ func (o *OptimizeVR) Finish() error { vectorIndexSize := vecIndex.Size() origSeg.cachedMeta.updateMeta(field, vectorIndexSize) for _, vr := range vrs { + var pl segment_api.VecPostingsList + var err error + // for each VR, populate postings list and iterators // by passing the obtained vector index and getting similar vectors. - pl, err := vecIndex.Search(vr.vector, vr.k, vr.searchParams) + + // Only applies to filtered kNN. + if vr.eligibleDocIDs != nil && len(vr.eligibleDocIDs) > 0 { + eligibleVectorInternalIDs := vr.getEligibleDocIDs() + if snapshotGlobalDocNums != nil { + // Only the eligible documents belonging to this segment + // will get filtered out. + // There is no way to determine which doc belongs to which segment + eligibleVectorInternalIDs.And(snapshotGlobalDocNums[index]) + } + + eligibleLocalDocNums := make([]uint64, + eligibleVectorInternalIDs.GetCardinality()) + // get the (segment-)local document numbers + for i, docNum := range eligibleVectorInternalIDs.ToArray() { + localDocNum := o.snapshot.localDocNumFromGlobal(index, + uint64(docNum)) + eligibleLocalDocNums[i] = localDocNum + } + + pl, err = vecIndex.SearchWithFilter(vr.vector, vr.k, + eligibleLocalDocNums, vr.searchParams) + } else { + pl, err = vecIndex.Search(vr.vector, vr.k, vr.searchParams) + } + if err != nil { errorsM.Lock() errors = append(errors, err) @@ -140,6 +177,9 @@ func (s *IndexSnapshotVectorReader) VectorOptimize(ctx context.Context, return octx, nil } o.ctx = ctx + if !o.requiresFiltering { + o.requiresFiltering = len(s.eligibleDocIDs) > 0 + } if o.snapshot != s.snapshot { o.invokeSearcherEndCallback() diff --git a/vendor/github.com/blevesearch/bleve/v2/index/scorch/scorch.go b/vendor/github.com/blevesearch/bleve/v2/index/scorch/scorch.go index 7966d844d0..429d1daa91 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index/scorch/scorch.go +++ b/vendor/github.com/blevesearch/bleve/v2/index/scorch/scorch.go @@ -49,7 +49,7 @@ type Scorch struct { unsafeBatch bool - rootLock sync.RWMutex + rootLock sync.RWMutex root *IndexSnapshot // holds 1 ref-count on the root rootPersisted []chan error // closed when root is persisted @@ -376,6 +376,8 @@ func (s *Scorch) Delete(id string) error { func (s *Scorch) Batch(batch *index.Batch) (err error) { start := time.Now() + // notify handlers that we're about to index a batch of data + s.fireEvent(EventKindBatchIntroductionStart, 0) defer func() { s.fireEvent(EventKindBatchIntroduction, time.Since(start)) }() @@ -434,9 +436,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { indexStart := time.Now() - // notify handlers that we're about to introduce a segment - s.fireEvent(EventKindBatchIntroductionStart, 0) - var newSegment segment.Segment var bufBytes uint64 stats := newFieldStats() @@ -878,3 +877,8 @@ func (s *Scorch) CopyReader() index.CopyReader { s.rootLock.Unlock() return rv } + +// external API to fire a scorch event (EventKindIndexStart) externally from bleve +func (s *Scorch) FireIndexEvent() { + s.fireEvent(EventKindIndexStart, 0) +} diff --git a/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index.go b/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index.go index f0e7ae1cf7..79840a41ff 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index.go +++ b/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index.go @@ -471,16 +471,44 @@ func (is *IndexSnapshot) Document(id string) (rv index.Document, err error) { return rvd, nil } +// In a multi-segment index, each document has: +// 1. a local docnum - local to the segment +// 2. a global docnum - unique identifier across the index +// This function returns the segment index(the segment in which the docnum is present) +// and local docnum of a document. func (is *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int, uint64) { segmentIndex := sort.Search(len(is.offsets), func(x int) bool { return is.offsets[x] > docNum }) - 1 - localDocNum := docNum - is.offsets[segmentIndex] + localDocNum := is.localDocNumFromGlobal(segmentIndex, docNum) return int(segmentIndex), localDocNum } +// This function returns the local docnum, given the segment index and global docnum +func (is *IndexSnapshot) localDocNumFromGlobal(segmentIndex int, docNum uint64) uint64 { + return docNum - is.offsets[segmentIndex] +} + +// Function to return a mapping of the segment index to the live global doc nums +// in the segment of the specified index snapshot. +func (is *IndexSnapshot) globalDocNums() map[int]*roaring.Bitmap { + if len(is.segment) == 0 { + return nil + } + + segmentIndexGlobalDocNums := make(map[int]*roaring.Bitmap) + + for i := range is.segment { + segmentIndexGlobalDocNums[i] = roaring.NewBitmap() + for _, localDocNum := range is.segment[i].DocNumbersLive().ToArray() { + segmentIndexGlobalDocNums[i].Add(localDocNum + uint32(is.offsets[i])) + } + } + return segmentIndexGlobalDocNums +} + func (is *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) { docNum, err := docInternalToNumber(id) if err != nil { diff --git a/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index_vr.go b/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index_vr.go index 30e03dcba8..320364bc77 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index_vr.go +++ b/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_index_vr.go @@ -24,6 +24,7 @@ import ( "fmt" "reflect" + "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/v2/size" index "github.com/blevesearch/bleve_index_api" segment_api "github.com/blevesearch/scorch_segment_api/v2" @@ -51,6 +52,31 @@ type IndexSnapshotVectorReader struct { ctx context.Context searchParams json.RawMessage + + // The following fields are only applicable for vector readers which will + // process pre-filtered kNN queries. + eligibleDocIDs []index.IndexInternalID +} + +// Function to convert the internal IDs of the eligible documents to a type suitable +// for addition to a bitmap. +// Useful to have the eligible doc IDs in a bitmap to leverage the fast intersection +// (AND) operations. Eg. finding the eligible doc IDs present in a segment. +func (i *IndexSnapshotVectorReader) getEligibleDocIDs() *roaring.Bitmap { + res := roaring.NewBitmap() + if len(i.eligibleDocIDs) > 0 { + internalDocIDs := make([]uint32, 0, len(i.eligibleDocIDs)) + // converts the doc IDs to uint32 and returns + for _, eligibleDocInternalID := range i.eligibleDocIDs { + internalDocID, err := docInternalToNumber(index.IndexInternalID(eligibleDocInternalID)) + if err != nil { + continue + } + internalDocIDs = append(internalDocIDs, uint32(internalDocID)) + } + res.AddMany(internalDocIDs) + } + return res } func (i *IndexSnapshotVectorReader) Size() int { @@ -108,7 +134,17 @@ func (i *IndexSnapshotVectorReader) Advance(ID index.IndexInternalID, preAlloced *index.VectorDoc) (*index.VectorDoc, error) { if i.currPosting != nil && bytes.Compare(i.currID, ID) >= 0 { - i2, err := i.snapshot.VectorReader(i.ctx, i.vector, i.field, i.k, i.searchParams) + var i2 index.VectorReader + var err error + + if len(i.eligibleDocIDs) > 0 { + i2, err = i.snapshot.VectorReaderWithFilter(i.ctx, i.vector, i.field, + i.k, i.searchParams, i.eligibleDocIDs) + } else { + i2, err = i.snapshot.VectorReader(i.ctx, i.vector, i.field, i.k, + i.searchParams) + } + if err != nil { return nil, err } diff --git a/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_vector_index.go b/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_vector_index.go index 70546d4e37..bcb05024d2 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_vector_index.go +++ b/vendor/github.com/blevesearch/bleve/v2/index/scorch/snapshot_vector_index.go @@ -48,3 +48,29 @@ func (is *IndexSnapshot) VectorReader(ctx context.Context, vector []float32, return rv, nil } + +func (is *IndexSnapshot) VectorReaderWithFilter(ctx context.Context, vector []float32, + field string, k int64, searchParams json.RawMessage, + filterIDs []index.IndexInternalID) ( + index.VectorReader, error) { + + rv := &IndexSnapshotVectorReader{ + vector: vector, + field: field, + k: k, + snapshot: is, + searchParams: searchParams, + eligibleDocIDs: filterIDs, + } + + if rv.postings == nil { + rv.postings = make([]segment_api.VecPostingsList, len(is.segment)) + } + if rv.iterators == nil { + rv.iterators = make([]segment_api.VecPostingsIterator, len(is.segment)) + } + + // initialize postings and iterators within the OptimizeVR's Finish() + + return rv, nil +} diff --git a/vendor/github.com/blevesearch/bleve/v2/index_impl.go b/vendor/github.com/blevesearch/bleve/v2/index_impl.go index 55212e3e68..e6debf17ad 100644 --- a/vendor/github.com/blevesearch/bleve/v2/index_impl.go +++ b/vendor/github.com/blevesearch/bleve/v2/index_impl.go @@ -256,6 +256,8 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) { return ErrorIndexClosed } + i.FireIndexEvent() + doc := document.NewDocument(id) err = i.m.MapDocument(doc, data) if err != nil { @@ -1112,3 +1114,16 @@ func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser, return os.OpenFile(filepath.Join(string(f), dir, file), os.O_RDWR|os.O_CREATE, 0600) } + +func (i *indexImpl) FireIndexEvent() { + // get the internal index implementation + internalIndex, err := i.Advanced() + if err != nil { + return + } + // check if the internal index implementation supports events + if internalEventIndex, ok := internalIndex.(index.EventIndex); ok { + // fire the Index() event + internalEventIndex.FireIndexEvent() + } +} diff --git a/vendor/github.com/blevesearch/bleve/v2/mapping/mapping_vectors.go b/vendor/github.com/blevesearch/bleve/v2/mapping/mapping_vectors.go index a3879c4bd7..dbfde1fb06 100644 --- a/vendor/github.com/blevesearch/bleve/v2/mapping/mapping_vectors.go +++ b/vendor/github.com/blevesearch/bleve/v2/mapping/mapping_vectors.go @@ -24,6 +24,7 @@ import ( "github.com/blevesearch/bleve/v2/document" "github.com/blevesearch/bleve/v2/util" index "github.com/blevesearch/bleve_index_api" + faiss "github.com/blevesearch/go-faiss" ) // Min and Max allowed dimensions for a vector field; @@ -140,6 +141,10 @@ func (fm *FieldMapping) processVector(propertyMightBeVector interface{}, if !ok { return false } + // normalize raw vector if similarity is cosine + if fm.Similarity == index.CosineSimilarity { + vector = NormalizeVector(vector) + } fieldName := getFieldName(pathString, path, fm) options := fm.Options() @@ -163,6 +168,10 @@ func (fm *FieldMapping) processVectorBase64(propertyMightBeVectorBase64 interfac if err != nil || len(decodedVector) != fm.Dims { return } + // normalize raw vector if similarity is cosine + if fm.Similarity == index.CosineSimilarity { + decodedVector = NormalizeVector(decodedVector) + } fieldName := getFieldName(pathString, path, fm) options := fm.Options() @@ -252,3 +261,12 @@ func validateVectorFieldAlias(field *FieldMapping, parentName string, return nil } + +func NormalizeVector(vec []float32) []float32 { + // make a copy of the vector to avoid modifying the original + // vector in-place + vecCopy := make([]float32, len(vec)) + copy(vecCopy, vec) + // normalize the vector copy using in-place normalization provided by faiss + return faiss.NormalizeVector(vecCopy) +} diff --git a/vendor/github.com/blevesearch/bleve/v2/search/collector/eligible.go b/vendor/github.com/blevesearch/bleve/v2/search/collector/eligible.go new file mode 100644 index 0000000000..5590290b06 --- /dev/null +++ b/vendor/github.com/blevesearch/bleve/v2/search/collector/eligible.go @@ -0,0 +1,157 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "fmt" + "time" + + "github.com/blevesearch/bleve/v2/search" + index "github.com/blevesearch/bleve_index_api" +) + +type EligibleCollector struct { + size int + total uint64 + took time.Duration + results search.DocumentMatchCollection + + ids []index.IndexInternalID +} + +func NewEligibleCollector(size int) *EligibleCollector { + return newEligibleCollector(size) +} + +func newEligibleCollector(size int) *EligibleCollector { + // No sort order & skip always 0 since this is only to filter eligible docs. + ec := &EligibleCollector{size: size, + ids: make([]index.IndexInternalID, 0, size), + } + return ec +} + +func makeEligibleDocumentMatchHandler(ctx *search.SearchContext) (search.DocumentMatchHandler, error) { + if ec, ok := ctx.Collector.(*EligibleCollector); ok { + return func(d *search.DocumentMatch) error { + if d == nil { + return nil + } + + copyOfID := make([]byte, len(d.IndexInternalID)) + copy(copyOfID, d.IndexInternalID) + ec.ids = append(ec.ids, copyOfID) + + // recycle the DocumentMatch + ctx.DocumentMatchPool.Put(d) + + return nil + }, nil + } + + return nil, fmt.Errorf("eligiblity collector not available") +} + +func (ec *EligibleCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error { + startTime := time.Now() + var err error + var next *search.DocumentMatch + + backingSize := ec.size + if backingSize > PreAllocSizeSkipCap { + backingSize = PreAllocSizeSkipCap + 1 + } + searchContext := &search.SearchContext{ + DocumentMatchPool: search.NewDocumentMatchPool(backingSize+searcher.DocumentMatchPoolSize(), 0), + Collector: ec, + IndexReader: reader, + } + + dmHandler, err := makeEligibleDocumentMatchHandler(searchContext) + if err != nil { + return err + } + select { + case <-ctx.Done(): + search.RecordSearchCost(ctx, search.AbortM, 0) + return ctx.Err() + default: + next, err = searcher.Next(searchContext) + } + for err == nil && next != nil { + if ec.total%CheckDoneEvery == 0 { + select { + case <-ctx.Done(): + search.RecordSearchCost(ctx, search.AbortM, 0) + return ctx.Err() + default: + } + } + ec.total++ + + err = dmHandler(next) + if err != nil { + break + } + + next, err = searcher.Next(searchContext) + } + if err != nil { + return err + } + + // help finalize/flush the results in case + // of custom document match handlers. + err = dmHandler(nil) + if err != nil { + return err + } + + // compute search duration + ec.took = time.Since(startTime) + + return nil +} + +func (ec *EligibleCollector) Results() search.DocumentMatchCollection { + return nil +} + +func (ec *EligibleCollector) IDs() []index.IndexInternalID { + return ec.ids +} + +func (ec *EligibleCollector) Total() uint64 { + return ec.total +} + +// No concept of scoring in the eligible collector. +func (ec *EligibleCollector) MaxScore() float64 { + return 0 +} + +func (ec *EligibleCollector) Took() time.Duration { + return ec.took +} + +func (ec *EligibleCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) { + // facet unsupported for pre-filtering in KNN search +} + +func (ec *EligibleCollector) FacetResults() search.FacetResults { + // facet unsupported for pre-filtering in KNN search + return nil +} diff --git a/vendor/github.com/blevesearch/bleve/v2/search/collector/slice.go b/vendor/github.com/blevesearch/bleve/v2/search/collector/slice.go index 07534e6934..6120921cb7 100644 --- a/vendor/github.com/blevesearch/bleve/v2/search/collector/slice.go +++ b/vendor/github.com/blevesearch/bleve/v2/search/collector/slice.go @@ -14,7 +14,9 @@ package collector -import "github.com/blevesearch/bleve/v2/search" +import ( + "github.com/blevesearch/bleve/v2/search" +) type collectStoreSlice struct { slice search.DocumentMatchCollection diff --git a/vendor/github.com/blevesearch/bleve/v2/search/query/boolean.go b/vendor/github.com/blevesearch/bleve/v2/search/query/boolean.go index 026a58688f..734dfd1369 100644 --- a/vendor/github.com/blevesearch/bleve/v2/search/query/boolean.go +++ b/vendor/github.com/blevesearch/bleve/v2/search/query/boolean.go @@ -74,6 +74,9 @@ func (q *BooleanQuery) SetMinShould(minShould float64) { } func (q *BooleanQuery) AddMust(m ...Query) { + if m == nil { + return + } if q.Must == nil { tmp := NewConjunctionQuery([]Query{}) tmp.queryStringMode = q.queryStringMode @@ -85,6 +88,9 @@ func (q *BooleanQuery) AddMust(m ...Query) { } func (q *BooleanQuery) AddShould(m ...Query) { + if m == nil { + return + } if q.Should == nil { tmp := NewDisjunctionQuery([]Query{}) tmp.queryStringMode = q.queryStringMode @@ -96,6 +102,9 @@ func (q *BooleanQuery) AddShould(m ...Query) { } func (q *BooleanQuery) AddMustNot(m ...Query) { + if m == nil { + return + } if q.MustNot == nil { tmp := NewDisjunctionQuery([]Query{}) tmp.queryStringMode = q.queryStringMode diff --git a/vendor/github.com/blevesearch/bleve/v2/search/query/knn.go b/vendor/github.com/blevesearch/bleve/v2/search/query/knn.go index 17e8554168..4d105d9435 100644 --- a/vendor/github.com/blevesearch/bleve/v2/search/query/knn.go +++ b/vendor/github.com/blevesearch/bleve/v2/search/query/knn.go @@ -35,7 +35,9 @@ type KNNQuery struct { BoostVal *Boost `json:"boost,omitempty"` // see KNNRequest.Params for description - Params json.RawMessage `json:"params"` + Params json.RawMessage `json:"params"` + FilterQuery Query `json:"filter,omitempty"` + filterResults []index.IndexInternalID } func NewKNNQuery(vector []float32) *KNNQuery { @@ -67,6 +69,14 @@ func (q *KNNQuery) SetParams(params json.RawMessage) { q.Params = params } +func (q *KNNQuery) SetFilterQuery(f Query) { + q.FilterQuery = f +} + +func (q *KNNQuery) SetFilterResults(results []index.IndexInternalID) { + q.filterResults = results +} + func (q *KNNQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) { fieldMapping := m.FieldMappingForPath(q.VectorField) @@ -77,6 +87,12 @@ func (q *KNNQuery) Searcher(ctx context.Context, i index.IndexReader, if q.K <= 0 || len(q.Vector) == 0 { return nil, fmt.Errorf("k must be greater than 0 and vector must be non-empty") } + if similarityMetric == index.CosineSimilarity { + // normalize the vector + q.Vector = mapping.NormalizeVector(q.Vector) + } + return searcher.NewKNNSearcher(ctx, i, m, options, q.VectorField, - q.Vector, q.K, q.BoostVal.Value(), similarityMetric, q.Params) + q.Vector, q.K, q.BoostVal.Value(), similarityMetric, q.Params, + q.filterResults) } diff --git a/vendor/github.com/blevesearch/bleve/v2/search/searcher/search_knn.go b/vendor/github.com/blevesearch/bleve/v2/search/searcher/search_knn.go index e17bb7a0f6..866900d4e2 100644 --- a/vendor/github.com/blevesearch/bleve/v2/search/searcher/search_knn.go +++ b/vendor/github.com/blevesearch/bleve/v2/search/searcher/search_knn.go @@ -49,11 +49,20 @@ type KNNSearcher struct { func NewKNNSearcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions, field string, vector []float32, k int64, - boost float64, similarityMetric string, searchParams json.RawMessage) ( + boost float64, similarityMetric string, searchParams json.RawMessage, + filterIDs []index.IndexInternalID) ( search.Searcher, error) { if vr, ok := i.(index.VectorIndexReader); ok { - vectorReader, err := vr.VectorReader(ctx, vector, field, k, searchParams) + var vectorReader index.VectorReader + var err error + + if len(filterIDs) > 0 { + vectorReader, err = vr.VectorReaderWithFilter(ctx, vector, field, k, + searchParams, filterIDs) + } else { + vectorReader, err = vr.VectorReader(ctx, vector, field, k, searchParams) + } if err != nil { return nil, err } diff --git a/vendor/github.com/blevesearch/bleve/v2/search_knn.go b/vendor/github.com/blevesearch/bleve/v2/search_knn.go index 008a3615c8..309b36593a 100644 --- a/vendor/github.com/blevesearch/bleve/v2/search_knn.go +++ b/vendor/github.com/blevesearch/bleve/v2/search_knn.go @@ -87,6 +87,10 @@ type KNNRequest struct { // // Consult go-faiss to know all supported search params Params json.RawMessage `json:"params"` + + // Filter query to use with kNN pre-filtering. + // Supports pre-filtering with all existing types of query clauses. + FilterQuery query.Query `json:"filter,omitempty"` } func (r *SearchRequest) AddKNN(field string, vector []float32, k int64, boost float64) { @@ -99,6 +103,18 @@ func (r *SearchRequest) AddKNN(field string, vector []float32, k int64, boost fl }) } +func (r *SearchRequest) AddKNNWithFilter(field string, vector []float32, k int64, + boost float64, filterQuery query.Query) { + b := query.Boost(boost) + r.KNN = append(r.KNN, &KNNRequest{ + Field: field, + Vector: vector, + K: k, + Boost: &b, + FilterQuery: filterQuery, + }) +} + func (r *SearchRequest) AddKNNOperator(operator knnOperator) { r.KNNOperator = operator } @@ -106,6 +122,16 @@ func (r *SearchRequest) AddKNNOperator(operator knnOperator) { // UnmarshalJSON deserializes a JSON representation of // a SearchRequest func (r *SearchRequest) UnmarshalJSON(input []byte) error { + type tempKNNReq struct { + Field string `json:"field"` + Vector []float32 `json:"vector"` + VectorBase64 string `json:"vector_base64"` + K int64 `json:"k"` + Boost *query.Boost `json:"boost,omitempty"` + Params json.RawMessage `json:"params"` + FilterQuery json.RawMessage `json:"filter,omitempty"` + } + var temp struct { Q json.RawMessage `json:"query"` Size *int `json:"size"` @@ -119,7 +145,7 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error { Score string `json:"score"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` - KNN []*KNNRequest `json:"knn"` + KNN []*tempKNNReq `json:"knn"` KNNOperator knnOperator `json:"knn_operator"` PreSearchData json.RawMessage `json:"pre_search_data"` } @@ -163,7 +189,22 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error { r.From = 0 } - r.KNN = temp.KNN + r.KNN = make([]*KNNRequest, len(temp.KNN)) + for i, knnReq := range temp.KNN { + r.KNN[i] = &KNNRequest{} + r.KNN[i].Field = temp.KNN[i].Field + r.KNN[i].Vector = temp.KNN[i].Vector + r.KNN[i].VectorBase64 = temp.KNN[i].VectorBase64 + r.KNN[i].K = temp.KNN[i].K + r.KNN[i].Boost = temp.KNN[i].Boost + r.KNN[i].Params = temp.KNN[i].Params + if len(knnReq.FilterQuery) == 0 { + // Setting this to nil to avoid ParseQuery() setting it to a match none + r.KNN[i].FilterQuery = nil + } else { + r.KNN[i].FilterQuery, err = query.ParseQuery(knnReq.FilterQuery) + } + } r.KNNOperator = temp.KNNOperator if r.KNNOperator == "" { r.KNNOperator = knnOperatorOr @@ -209,7 +250,9 @@ var ( knnOperatorOr = knnOperator("or") ) -func createKNNQuery(req *SearchRequest) (query.Query, []int64, int64, error) { +func createKNNQuery(req *SearchRequest, eligibleDocsMap map[int][]index.IndexInternalID, + requiresFiltering map[int]bool) ( + query.Query, []int64, int64, error) { if requestHasKNN(req) { // first perform validation err := validateKNN(req) @@ -219,12 +262,25 @@ func createKNNQuery(req *SearchRequest) (query.Query, []int64, int64, error) { var subQueries []query.Query kArray := make([]int64, 0, len(req.KNN)) sumOfK := int64(0) - for _, knn := range req.KNN { + for i, knn := range req.KNN { + // If it's a filtered kNN but has no eligible filter hits, then + // do not run the kNN query. + if requiresFiltering[i] && len(eligibleDocsMap[i]) <= 0 { + continue + } + knnQuery := query.NewKNNQuery(knn.Vector) knnQuery.SetFieldVal(knn.Field) knnQuery.SetK(knn.K) knnQuery.SetBoost(knn.Boost.Value()) knnQuery.SetParams(knn.Params) + if len(eligibleDocsMap[i]) > 0 { + knnQuery.SetFilterQuery(knn.FilterQuery) + filterResults, exists := eligibleDocsMap[i] + if exists { + knnQuery.SetFilterResults(filterResults) + } + } subQueries = append(subQueries, knnQuery) kArray = append(kArray, knn.K) sumOfK += knn.K @@ -303,7 +359,62 @@ func addSortAndFieldsToKNNHits(req *SearchRequest, knnHits []*search.DocumentMat } func (i *indexImpl) runKnnCollector(ctx context.Context, req *SearchRequest, reader index.IndexReader, preSearch bool) ([]*search.DocumentMatch, error) { - KNNQuery, kArray, sumOfK, err := createKNNQuery(req) + // maps the index of the KNN query in the req to the pre-filter hits aka + // eligible docs' internal IDs . + filterHitsMap := make(map[int][]index.IndexInternalID) + // Indicates if this query requires filtering downstream + // No filtering required if it's a match all query/no filters applied. + requiresFiltering := make(map[int]bool) + + for idx, knnReq := range req.KNN { + // TODO Can use goroutines for this filter query stuff - do it if perf results + // show this to be significantly slow otherwise. + filterQ := knnReq.FilterQuery + if filterQ == nil { + requiresFiltering[idx] = false + continue + } + + if _, ok := filterQ.(*query.MatchAllQuery); ok { + // Equivalent to not having a filter query. + requiresFiltering[idx] = false + continue + } + + if _, ok := filterQ.(*query.MatchNoneQuery); ok { + // Filtering required since no hits are eligible. + requiresFiltering[idx] = true + // a match none query just means none the documents are eligible + // hence, we can save on running the query. + continue + } + + // Applies to all supported types of queries. + filterSearcher, _ := filterQ.Searcher(ctx, reader, i.m, search.SearcherOptions{ + Score: "none", // just want eligible hits --> don't compute scores if not needed + }) + // Using the index doc count to determine collector size since we do not + // have an estimate of the number of eligible docs in the index yet. + indexDocCount, err := i.DocCount() + if err != nil { + return nil, err + } + filterColl := collector.NewEligibleCollector(int(indexDocCount)) + err = filterColl.Collect(ctx, filterSearcher, reader) + if err != nil { + return nil, err + } + filterHits := filterColl.IDs() + if len(filterHits) > 0 { + filterHitsMap[idx] = filterHits + } + // set requiresFiltering regardless of whether there're filtered hits or + // not to later decide whether to consider the knnQuery or not + requiresFiltering[idx] = true + } + + // Add the filter hits when creating the kNN query + KNNQuery, kArray, sumOfK, err := createKNNQuery(req, filterHitsMap, requiresFiltering) if err != nil { return nil, err } diff --git a/vendor/github.com/blevesearch/bleve_index_api/index.go b/vendor/github.com/blevesearch/bleve_index_api/index.go index a0035560e5..c2125d6609 100644 --- a/vendor/github.com/blevesearch/bleve_index_api/index.go +++ b/vendor/github.com/blevesearch/bleve_index_api/index.go @@ -57,6 +57,14 @@ type CopyIndex interface { CopyReader() CopyReader } +// EventIndex is an optional interface for exposing the support for firing event +// callbacks for various events in the index. +type EventIndex interface { + // FireIndexEvent is used to fire an event callback when Index() is called, + // to notify the caller that a document has been added to the index. + FireIndexEvent() +} + type IndexReader interface { TermFieldReader(ctx context.Context, term []byte, field string, includeFreq, includeNorm, includeTermVectors bool) (TermFieldReader, error) diff --git a/vendor/github.com/blevesearch/bleve_index_api/vector.go b/vendor/github.com/blevesearch/bleve_index_api/vector.go index 3eff52cae2..c1b5837a58 100644 --- a/vendor/github.com/blevesearch/bleve_index_api/vector.go +++ b/vendor/github.com/blevesearch/bleve_index_api/vector.go @@ -32,12 +32,9 @@ type VectorField interface { const ( EuclideanDistance = "l2_norm" - // dotProduct(vecA, vecB) = vecA . vecB = |vecA| * |vecB| * cos(theta); - // where, theta is the angle between vecA and vecB - // If vecA and vecB are normalized (unit magnitude), then - // vecA . vecB = cos(theta), which is the cosine similarity. - // Thus, we don't need a separate similarity type for cosine similarity - CosineSimilarity = "dot_product" + InnerProduct = "dot_product" + + CosineSimilarity = "cosine" ) const DefaultSimilarityMetric = EuclideanDistance @@ -45,6 +42,7 @@ const DefaultSimilarityMetric = EuclideanDistance // Supported similarity metrics for vector fields var SupportedSimilarityMetrics = map[string]struct{}{ EuclideanDistance: {}, + InnerProduct: {}, CosineSimilarity: {}, } diff --git a/vendor/github.com/blevesearch/bleve_index_api/vector_index.go b/vendor/github.com/blevesearch/bleve_index_api/vector_index.go index da0a74ae96..d1a4ca3fe7 100644 --- a/vendor/github.com/blevesearch/bleve_index_api/vector_index.go +++ b/vendor/github.com/blevesearch/bleve_index_api/vector_index.go @@ -48,8 +48,11 @@ type VectorReader interface { } type VectorIndexReader interface { - VectorReader(ctx context.Context, vector []float32, field string, k int64, searchParams json.RawMessage) ( - VectorReader, error) + VectorReader(ctx context.Context, vector []float32, field string, k int64, + searchParams json.RawMessage) (VectorReader, error) + + VectorReaderWithFilter(ctx context.Context, vector []float32, field string, k int64, + searchParams json.RawMessage, filterIDs []IndexInternalID) (VectorReader, error) } type VectorDoc struct { diff --git a/vendor/github.com/blevesearch/go-faiss/faiss.go b/vendor/github.com/blevesearch/go-faiss/faiss.go index 4a73f760fe..a7087e7459 100644 --- a/vendor/github.com/blevesearch/go-faiss/faiss.go +++ b/vendor/github.com/blevesearch/go-faiss/faiss.go @@ -9,6 +9,7 @@ package faiss #include #include +#include */ import "C" import "errors" @@ -28,3 +29,13 @@ const ( MetricBrayCurtis = C.METRIC_BrayCurtis MetricJensenShannon = C.METRIC_JensenShannon ) + +// In-place normalization of provided vector (single) +func NormalizeVector(vector []float32) []float32 { + C.faiss_fvec_renorm_L2( + C.size_t(len(vector)), + 1, // number of vectors + (*C.float)(&vector[0])) + + return vector +} diff --git a/vendor/github.com/blevesearch/go-faiss/index.go b/vendor/github.com/blevesearch/go-faiss/index.go index b58a6149fe..58543f291d 100644 --- a/vendor/github.com/blevesearch/go-faiss/index.go +++ b/vendor/github.com/blevesearch/go-faiss/index.go @@ -44,6 +44,15 @@ type Index interface { // AddWithIDs is like Add, but stores xids instead of sequential IDs. AddWithIDs(x []float32, xids []int64) error + // Applicable only to IVF indexes: Return a map of centroid ID --> []vector IDs + // for the cluster. + ObtainClusterToVecIDsFromIVFIndex() (ids map[int64][]int64, err error) + + // Applicable only to IVF indexes: Returns the centroid IDs in decreasing order + // of proximity to query 'x' and their distance from 'x' + ObtainClustersWithDistancesFromIVFIndex(x []float32, centroidIDs []int64) ( + []int64, []float32, error) + // Search queries the index with the vectors in x. // Returns the IDs of the k nearest neighbors for each query vector and the // corresponding distances. @@ -52,6 +61,14 @@ type Index interface { SearchWithoutIDs(x []float32, k int64, exclude []int64, params json.RawMessage) (distances []float32, labels []int64, err error) + SearchWithIDs(x []float32, k int64, include []int64, params json.RawMessage) (distances []float32, + labels []int64, err error) + + // Applicable only to IVF indexes: Search clusters whose IDs are in eligibleCentroidIDs + SearchClustersFromIVFIndex(selector Selector, nvecs int, eligibleCentroidIDs []int64, + minEligibleCentroids int, k int64, x, centroidDis []float32, + params json.RawMessage) ([]float32, []int64, error) + Reconstruct(key int64) ([]float32, error) ReconstructBatch(keys []int64, recons []float32) ([]float32, error) @@ -123,6 +140,101 @@ func (idx *faissIndex) Add(x []float32) error { return nil } +func (idx *faissIndex) ObtainClusterToVecIDsFromIVFIndex() (map[int64][]int64, error) { + // This type assertion is required to determine whether to invoke + // ObtainClustersWithDistancesFromIVFIndex, SearchClustersFromIVFIndex or not. + if ivfIdx := C.faiss_IndexIVF_cast(idx.cPtr()); ivfIdx == nil { + return nil, nil + } + + clusterVectorIDMap := make(map[int64][]int64) + + nlist := C.faiss_IndexIVF_nlist(idx.idx) + for i := 0; i < int(nlist); i++ { + list_size := C.faiss_IndexIVF_get_list_size(idx.idx, C.size_t(i)) + invlist := make([]int64, list_size) + C.faiss_IndexIVF_invlists_get_ids(idx.idx, C.size_t(i), (*C.idx_t)(&invlist[0])) + clusterVectorIDMap[int64(i)] = invlist + } + + return clusterVectorIDMap, nil +} + +func (idx *faissIndex) ObtainClustersWithDistancesFromIVFIndex(x []float32, centroidIDs []int64) ( + []int64, []float32, error) { + // Selector to include only the centroids whose IDs are part of 'centroidIDs'. + includeSelector, err := NewIDSelectorBatch(centroidIDs) + if err != nil { + return nil, nil, err + } + defer includeSelector.Delete() + + params, err := NewSearchParams(idx, json.RawMessage{}, includeSelector.Get()) + if err != nil { + return nil, nil, err + } + + // Populate these with the centroids and their distances. + centroids := make([]int64, len(centroidIDs)) + centroidDistances := make([]float32, len(centroidIDs)) + + n := len(x) / idx.D() + + c := C.faiss_Search_closest_eligible_centroids(idx.idx, (C.int)(n), + (*C.float)(&x[0]), (C.int)(len(centroidIDs)), + (*C.float)(¢roidDistances[0]), (*C.idx_t)(¢roids[0]), params.sp) + if c != 0 { + return nil, nil, getLastError() + } + + return centroids, centroidDistances, nil +} + +func (idx *faissIndex) SearchClustersFromIVFIndex(selector Selector, nvecs int, + eligibleCentroidIDs []int64, minEligibleCentroids int, k int64, x, + centroidDis []float32, params json.RawMessage) ([]float32, []int64, error) { + defer selector.Delete() + + tempParams := defaultSearchParamsIVF{ + Nlist: len(eligibleCentroidIDs), + // Have to override nprobe so that more clusters will be searched for this + // query, if required. + Nprobe: minEligibleCentroids, + Nvecs: nvecs, + } + + searchParams, err := NewSearchParamsIVF(idx, params, selector.Get(), + tempParams) + if err != nil { + return nil, nil, err + } + + n := len(x) / idx.D() + + distances := make([]float32, int64(n)*k) + labels := make([]int64, int64(n)*k) + + effectiveNprobe := getNProbeFromSearchParams(searchParams) + eligibleCentroidIDs = eligibleCentroidIDs[:effectiveNprobe] + centroidDis = centroidDis[:effectiveNprobe] + + if c := C.faiss_IndexIVF_search_preassigned_with_params( + idx.idx, + (C.idx_t)(n), + (*C.float)(&x[0]), + (C.idx_t)(k), + (*C.idx_t)(&eligibleCentroidIDs[0]), + (*C.float)(¢roidDis[0]), + (*C.float)(&distances[0]), + (*C.idx_t)(&labels[0]), + (C.int)(0), + searchParams.sp); c != 0 { + return nil, nil, getLastError() + } + + return distances, labels, nil +} + func (idx *faissIndex) AddWithIDs(x []float32, xids []int64) error { n := len(x) / idx.D() if c := C.faiss_Index_add_with_ids( @@ -139,7 +251,6 @@ func (idx *faissIndex) AddWithIDs(x []float32, xids []int64) error { func (idx *faissIndex) Search(x []float32, k int64) ( distances []float32, labels []int64, err error, ) { - n := len(x) / idx.D() distances = make([]float32, int64(n)*k) labels = make([]int64, int64(n)*k) @@ -170,7 +281,7 @@ func (idx *faissIndex) SearchWithoutIDs(x []float32, k int64, exclude []int64, p if err != nil { return nil, nil, err } - selector = excludeSelector.sel + selector = excludeSelector.Get() defer excludeSelector.Delete() } @@ -185,6 +296,25 @@ func (idx *faissIndex) SearchWithoutIDs(x []float32, k int64, exclude []int64, p return } +func (idx *faissIndex) SearchWithIDs(x []float32, k int64, include []int64, + params json.RawMessage) (distances []float32, labels []int64, err error, +) { + includeSelector, err := NewIDSelectorBatch(include) + if err != nil { + return nil, nil, err + } + defer includeSelector.Delete() + + searchParams, err := NewSearchParams(idx, params, includeSelector.Get()) + if err != nil { + return nil, nil, err + } + defer searchParams.Delete() + + distances, labels, err = idx.searchWithParams(x, k, searchParams.sp) + return +} + func (idx *faissIndex) Reconstruct(key int64) (recons []float32, err error) { rv := make([]float32, idx.D()) if c := C.faiss_Index_reconstruct( diff --git a/vendor/github.com/blevesearch/go-faiss/index_ivf.go b/vendor/github.com/blevesearch/go-faiss/index_ivf.go index 2d84e4ab90..38f023aa90 100644 --- a/vendor/github.com/blevesearch/go-faiss/index_ivf.go +++ b/vendor/github.com/blevesearch/go-faiss/index_ivf.go @@ -51,3 +51,11 @@ func (idx *IndexImpl) SetNProbe(nprobe int32) { } C.faiss_IndexIVF_set_nprobe(ivfPtr, C.size_t(nprobe)) } + +func (idx *IndexImpl) GetNProbe() int32 { + ivfPtr := C.faiss_IndexIVF_cast(idx.cPtr()) + if ivfPtr == nil { + return 0 + } + return int32(C.faiss_IndexIVF_nprobe(ivfPtr)) +} diff --git a/vendor/github.com/blevesearch/go-faiss/search_params.go b/vendor/github.com/blevesearch/go-faiss/search_params.go index 17575d5fbb..cc49a6a5b2 100644 --- a/vendor/github.com/blevesearch/go-faiss/search_params.go +++ b/vendor/github.com/blevesearch/go-faiss/search_params.go @@ -28,6 +28,15 @@ type searchParamsIVF struct { MaxCodesPct float32 `json:"ivf_max_codes_pct,omitempty"` } +// IVF Parameters used to override the index-time defaults for a specific query. +// Serve as the 'new' defaults for this query, unless overridden by search-time +// params. +type defaultSearchParamsIVF struct { + Nprobe int `json:"ivf_nprobe,omitempty"` + Nlist int `json:"ivf_nlist,omitempty"` + Nvecs int `json:"ivf_nvecs,omitempty"` +} + func (s *searchParamsIVF) Validate() error { if s.NprobePct < 0 || s.NprobePct > 100 { return fmt.Errorf("invalid IVF search params, ivf_nprobe_pct:%v, "+ @@ -42,6 +51,70 @@ func (s *searchParamsIVF) Validate() error { return nil } +func getNProbeFromSearchParams(params *SearchParams) int32 { + return int32(C.faiss_SearchParametersIVF_nprobe(params.sp)) +} + +func NewSearchParamsIVF(idx Index, params json.RawMessage, sel *C.FaissIDSelector, + defaultParams defaultSearchParamsIVF) (*SearchParams, error) { + rv := &SearchParams{} + if ivfIdx := C.faiss_IndexIVF_cast(idx.cPtr()); ivfIdx != nil { + rv.sp = C.faiss_SearchParametersIVF_cast(rv.sp) + if len(params) == 0 && sel == nil { + return rv, nil + } + + var nprobe, maxCodes, nlist int + nlist = int(C.faiss_IndexIVF_nlist(ivfIdx)) + // It's important to set nprobe to the value decided at the time of + // index creation. Otherwise, nprobe will be set to the default + // value of 1. + nprobe = int(C.faiss_IndexIVF_nprobe(ivfIdx)) + + nvecs := idx.Ntotal() + if defaultParams.Nvecs > 0 { + nvecs = int64(defaultParams.Nvecs) + } + if defaultParams.Nlist > 0 { + nlist = defaultParams.Nlist + } + if defaultParams.Nprobe > 0 { + nprobe = defaultParams.Nprobe + } + + var ivfParams searchParamsIVF + if len(params) > 0 { + if err := json.Unmarshal(params, &ivfParams); err != nil { + return rv, fmt.Errorf("failed to unmarshal IVF search params, "+ + "err:%v", err) + } + if err := ivfParams.Validate(); err != nil { + return rv, err + } + } + + if ivfParams.NprobePct > 0 { + // in the situation when the calculated nprobe happens to be + // between 0 and 1, we'll round it up. + nprobe = max(int(float32(nlist)*(ivfParams.NprobePct/100)), 1) + } + + if ivfParams.MaxCodesPct > 0 { + maxCodes = int(float32(nvecs) * (ivfParams.MaxCodesPct / 100)) + } // else, maxCodes will be set to the default value of 0, which means no limit + + if c := C.faiss_SearchParametersIVF_new_with( + &rv.sp, + sel, + C.size_t(nprobe), + C.size_t(maxCodes), + ); c != 0 { + return rv, fmt.Errorf("failed to create faiss IVF search params") + } + } + return rv, nil +} + // Always return a valid SearchParams object, // thus caller must clean up the object // by invoking Delete() method, even if an error is returned. @@ -52,29 +125,33 @@ func NewSearchParams(idx Index, params json.RawMessage, sel *C.FaissIDSelector, return rv, fmt.Errorf("failed to create faiss search params") } - // # check if the index is IVF and set the search params + // check if the index is IVF and set the search params if ivfIdx := C.faiss_IndexIVF_cast(idx.cPtr()); ivfIdx != nil { rv.sp = C.faiss_SearchParametersIVF_cast(rv.sp) - if len(params) == 0 { + if len(params) == 0 && sel == nil { return rv, nil } var ivfParams searchParamsIVF - if err := json.Unmarshal(params, &ivfParams); err != nil { - return rv, fmt.Errorf("failed to unmarshal IVF search params, "+ - "err:%v", err) - } - if err := ivfParams.Validate(); err != nil { - return rv, err + if len(params) > 0 { + if err := json.Unmarshal(params, &ivfParams); err != nil { + return rv, fmt.Errorf("failed to unmarshal IVF search params, "+ + "err:%v", err) + } + if err := ivfParams.Validate(); err != nil { + return rv, err + } } var nprobe, maxCodes int if ivfParams.NprobePct > 0 { nlist := float32(C.faiss_IndexIVF_nlist(ivfIdx)) - nprobe = int(nlist * (ivfParams.NprobePct / 100)) + // in the situation when the calculated nprobe happens to be + // between 0 and 1, we'll round it up. + nprobe = max(int(nlist*(ivfParams.NprobePct/100)), 1) } else { - // It's important to set nprobe to the value decided at the time of + // it's important to set nprobe to the value decided at the time of // index creation. Otherwise, nprobe will be set to the default // value of 1. nprobe = int(C.faiss_IndexIVF_nprobe(ivfIdx)) diff --git a/vendor/github.com/blevesearch/go-faiss/selector.go b/vendor/github.com/blevesearch/go-faiss/selector.go index d372006b96..8e95c4618f 100644 --- a/vendor/github.com/blevesearch/go-faiss/selector.go +++ b/vendor/github.com/blevesearch/go-faiss/selector.go @@ -5,6 +5,11 @@ package faiss */ import "C" +type Selector interface { + Get() *C.FaissIDSelector + Delete() +} + // IDSelector represents a set of IDs to remove. type IDSelector struct { sel *C.FaissIDSelector @@ -19,13 +24,17 @@ func (s *IDSelector) Delete() { C.faiss_IDSelector_free(s.sel) } -type IDSelectorBatch struct { +func (s *IDSelector) Get() *C.FaissIDSelector { + return s.sel +} + +type IDSelectorNot struct { sel *C.FaissIDSelector batchSel *C.FaissIDSelector } // Delete frees the memory associated with s. -func (s *IDSelectorBatch) Delete() { +func (s *IDSelectorNot) Delete() { if s == nil { return } @@ -38,8 +47,12 @@ func (s *IDSelectorBatch) Delete() { } } +func (s *IDSelectorNot) Get() *C.FaissIDSelector { + return s.sel +} + // NewIDSelectorRange creates a selector that removes IDs on [imin, imax). -func NewIDSelectorRange(imin, imax int64) (*IDSelector, error) { +func NewIDSelectorRange(imin, imax int64) (Selector, error) { var sel *C.FaissIDSelectorRange c := C.faiss_IDSelectorRange_new(&sel, C.idx_t(imin), C.idx_t(imax)) if c != 0 { @@ -49,7 +62,7 @@ func NewIDSelectorRange(imin, imax int64) (*IDSelector, error) { } // NewIDSelectorBatch creates a new batch selector. -func NewIDSelectorBatch(indices []int64) (*IDSelector, error) { +func NewIDSelectorBatch(indices []int64) (Selector, error) { var sel *C.FaissIDSelectorBatch if c := C.faiss_IDSelectorBatch_new( &sel, @@ -61,9 +74,9 @@ func NewIDSelectorBatch(indices []int64) (*IDSelector, error) { return &IDSelector{(*C.FaissIDSelector)(sel)}, nil } -// NewIDSelectorNot creates a new Not selector, wrapped arround a +// NewIDSelectorNot creates a new Not selector, wrapped around a // batch selector, with the IDs in 'exclude'. -func NewIDSelectorNot(exclude []int64) (*IDSelectorBatch, error) { +func NewIDSelectorNot(exclude []int64) (Selector, error) { batchSelector, err := NewIDSelectorBatch(exclude) if err != nil { return nil, err @@ -72,10 +85,11 @@ func NewIDSelectorNot(exclude []int64) (*IDSelectorBatch, error) { var sel *C.FaissIDSelectorNot if c := C.faiss_IDSelectorNot_new( &sel, - batchSelector.sel, + batchSelector.Get(), ); c != 0 { batchSelector.Delete() return nil, getLastError() } - return &IDSelectorBatch{sel: (*C.FaissIDSelector)(sel), batchSel: batchSelector.sel}, nil + return &IDSelectorNot{sel: (*C.FaissIDSelector)(sel), + batchSel: batchSelector.Get()}, nil } diff --git a/vendor/github.com/blevesearch/scorch_segment_api/v2/segment_vector.go b/vendor/github.com/blevesearch/scorch_segment_api/v2/segment_vector.go index 3af08e25e8..77b0362120 100644 --- a/vendor/github.com/blevesearch/scorch_segment_api/v2/segment_vector.go +++ b/vendor/github.com/blevesearch/scorch_segment_api/v2/segment_vector.go @@ -59,13 +59,17 @@ type VecPostingsIterator interface { type VectorIndex interface { // @params: Search params for backing vector index (like IVF, HNSW, etc.) Search(qVector []float32, k int64, params json.RawMessage) (VecPostingsList, error) + // @eligibleDocIDs: DocIDs in the segment eligible for the kNN query. + SearchWithFilter(qVector []float32, k int64, eligibleDocIDs []uint64, + params json.RawMessage) (VecPostingsList, error) Close() Size() uint64 } type VectorSegment interface { Segment - InterpretVectorIndex(field string, except *roaring.Bitmap) (VectorIndex, error) + InterpretVectorIndex(field string, requiresFiltering bool, except *roaring.Bitmap) ( + VectorIndex, error) } type VecPosting interface { diff --git a/vendor/github.com/blevesearch/zapx/v15/chunk.go b/vendor/github.com/blevesearch/zapx/v15/chunk.go index 4307d0ed29..53d124f063 100644 --- a/vendor/github.com/blevesearch/zapx/v15/chunk.go +++ b/vendor/github.com/blevesearch/zapx/v15/chunk.go @@ -15,6 +15,7 @@ package zap import ( + "errors" "fmt" ) @@ -26,8 +27,18 @@ var LegacyChunkMode uint32 = 1024 // be used by default. var DefaultChunkMode uint32 = 1026 +var ErrChunkSizeZero = errors.New("chunk size is zero") + +// getChunkSize returns the chunk size for the given chunkMode, cardinality, and +// maxDocs. +// +// In error cases, the returned chunk size will be 0. Caller can differentiate +// between a valid chunk size of 0 and an error by checking for ErrChunkSizeZero. func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, error) { switch { + case chunkMode == 0: + return 0, ErrChunkSizeZero + // any chunkMode <= 1024 will always chunk with chunkSize=chunkMode case chunkMode <= 1024: // legacy chunk size @@ -46,6 +57,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, // chunk-size items. // no attempt is made to tweak any other case if cardinality <= 1024 { + if maxDocs == 0 { + return 0, ErrChunkSizeZero + } return maxDocs, nil } return 1024, nil @@ -61,6 +75,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, // 2. convert to chunkSize, dividing into maxDocs numChunks := (cardinality / 1024) + 1 chunkSize := maxDocs / numChunks + if chunkSize == 0 { + return 0, ErrChunkSizeZero + } return chunkSize, nil } return 0, fmt.Errorf("unknown chunk mode %d", chunkMode) diff --git a/vendor/github.com/blevesearch/zapx/v15/contentcoder.go b/vendor/github.com/blevesearch/zapx/v15/contentcoder.go index cd8b3fc86f..3343d31795 100644 --- a/vendor/github.com/blevesearch/zapx/v15/contentcoder.go +++ b/vendor/github.com/blevesearch/zapx/v15/contentcoder.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "io" "reflect" - "sync/atomic" "github.com/golang/snappy" ) @@ -37,7 +36,7 @@ var ( ) type chunkedContentCoder struct { - bytesWritten uint64 // atomic access to this variable, moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS. + bytesWritten uint64 // moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS. final []byte chunkSize uint64 @@ -112,11 +111,11 @@ func (c *chunkedContentCoder) Close() error { } func (c *chunkedContentCoder) incrementBytesWritten(val uint64) { - atomic.AddUint64(&c.bytesWritten, val) + c.bytesWritten += val } func (c *chunkedContentCoder) getBytesWritten() uint64 { - return atomic.LoadUint64(&c.bytesWritten) + return c.bytesWritten } func (c *chunkedContentCoder) flushContents() error { diff --git a/vendor/github.com/blevesearch/zapx/v15/docvalues.go b/vendor/github.com/blevesearch/zapx/v15/docvalues.go index 046244d130..9c306a8d77 100644 --- a/vendor/github.com/blevesearch/zapx/v15/docvalues.go +++ b/vendor/github.com/blevesearch/zapx/v15/docvalues.go @@ -75,7 +75,6 @@ type docValueReader struct { curChunkData []byte // compressed data cache uncompressed []byte // temp buf for snappy decompression - // atomic access to this variable bytesRead uint64 } diff --git a/vendor/github.com/blevesearch/zapx/v15/intDecoder.go b/vendor/github.com/blevesearch/zapx/v15/intDecoder.go index e50c471722..1a69e61450 100644 --- a/vendor/github.com/blevesearch/zapx/v15/intDecoder.go +++ b/vendor/github.com/blevesearch/zapx/v15/intDecoder.go @@ -27,7 +27,6 @@ type chunkedIntDecoder struct { data []byte r *memUvarintReader - // atomic access to this variable bytesRead uint64 } diff --git a/vendor/github.com/blevesearch/zapx/v15/intcoder.go b/vendor/github.com/blevesearch/zapx/v15/intcoder.go index 2957fbd098..e1586edc46 100644 --- a/vendor/github.com/blevesearch/zapx/v15/intcoder.go +++ b/vendor/github.com/blevesearch/zapx/v15/intcoder.go @@ -18,7 +18,6 @@ import ( "bytes" "encoding/binary" "io" - "sync/atomic" ) // We can safely use 0 to represent termNotEncoded since 0 @@ -36,7 +35,6 @@ type chunkedIntCoder struct { buf []byte - // atomic access to this variable bytesWritten uint64 } @@ -79,11 +77,11 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) { } func (c *chunkedIntCoder) incrementBytesWritten(val uint64) { - atomic.AddUint64(&c.bytesWritten, val) + c.bytesWritten += val } func (c *chunkedIntCoder) getBytesWritten() uint64 { - return atomic.LoadUint64(&c.bytesWritten) + return c.bytesWritten } // Add encodes the provided integers into the correct chunk for the provided diff --git a/vendor/github.com/blevesearch/zapx/v15/merge.go b/vendor/github.com/blevesearch/zapx/v15/merge.go index fa406e6dbd..63ff2089b7 100644 --- a/vendor/github.com/blevesearch/zapx/v15/merge.go +++ b/vendor/github.com/blevesearch/zapx/v15/merge.go @@ -591,6 +591,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo use1HitEncoding func(uint64) (bool, uint64, uint64), w *CountHashWriter, bufMaxVarintLen64 []byte) ( offset uint64, err error) { + if postings == nil { + return 0, nil + } + termCardinality := postings.GetCardinality() if termCardinality <= 0 { return 0, nil diff --git a/vendor/github.com/blevesearch/zapx/v15/new.go b/vendor/github.com/blevesearch/zapx/v15/new.go index f659a5d08b..869d1b53f8 100644 --- a/vendor/github.com/blevesearch/zapx/v15/new.go +++ b/vendor/github.com/blevesearch/zapx/v15/new.go @@ -147,7 +147,6 @@ type interim struct { lastNumDocs int lastOutSize int - // atomic access to this variable bytesWritten uint64 } @@ -497,11 +496,11 @@ func (s *interim) processDocument(docNum uint64, } func (s *interim) getBytesWritten() uint64 { - return atomic.LoadUint64(&s.bytesWritten) + return s.bytesWritten } func (s *interim) incrementBytesWritten(val uint64) { - atomic.AddUint64(&s.bytesWritten, val) + s.bytesWritten += val } func (s *interim) writeStoredFields() ( @@ -667,7 +666,11 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err locs := s.Locs[pid] locOffset := 0 - chunkSize, err := getChunkSize(s.chunkMode, postingsBS.GetCardinality(), uint64(len(s.results))) + var cardinality uint64 + if postingsBS != nil { + cardinality = postingsBS.GetCardinality() + } + chunkSize, err := getChunkSize(s.chunkMode, cardinality, uint64(len(s.results))) if err != nil { return 0, nil, err } diff --git a/vendor/github.com/blevesearch/zapx/v15/posting.go b/vendor/github.com/blevesearch/zapx/v15/posting.go index ad47df0dd6..07ae202f6d 100644 --- a/vendor/github.com/blevesearch/zapx/v15/posting.go +++ b/vendor/github.com/blevesearch/zapx/v15/posting.go @@ -305,10 +305,7 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { chunkSize, err := getChunkSize(d.sb.chunkMode, rv.postings.GetCardinality(), d.sb.numDocs) if err != nil { - return err - } else if chunkSize == 0 { - return fmt.Errorf("chunk size is zero, chunkMode: %v, numDocs: %v", - d.sb.chunkMode, d.sb.numDocs) + return fmt.Errorf("failed to get chunk size: %v", err) } rv.chunkSize = chunkSize @@ -632,6 +629,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfter(atOrAfter uint64) (uint64, bool, return i.nextDocNumAtOrAfterClean(atOrAfter) } + if i.postings.chunkSize == 0 { + return 0, false, ErrChunkSizeZero + } + i.Actual.AdvanceIfNeeded(uint32(atOrAfter)) if !i.Actual.HasNext() || !i.all.HasNext() { @@ -741,6 +742,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfterClean( return uint64(i.Actual.Next()), true, nil } + if i.postings != nil && i.postings.chunkSize == 0 { + return 0, false, ErrChunkSizeZero + } + // freq-norm's needed, so maintain freq-norm chunk reader sameChunkNexts := 0 // # of times we called Next() in the same chunk n := i.Actual.Next() diff --git a/vendor/github.com/blevesearch/zapx/v16/build.go b/vendor/github.com/blevesearch/zapx/v16/build.go index 99635739fc..53fd34d12c 100644 --- a/vendor/github.com/blevesearch/zapx/v16/build.go +++ b/vendor/github.com/blevesearch/zapx/v16/build.go @@ -157,24 +157,24 @@ func persistStoredFieldValues(fieldID int, return curr, data, nil } -func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, - fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64, - storedIndexOffset uint64, dictLocs []uint64, - sectionsIndexOffset uint64) (*SegmentBase, error) { +func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64, + storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) { sb := &SegmentBase{ mem: mem, memCRC: memCRC, chunkMode: chunkMode, - fieldsMap: fieldsMap, numDocs: numDocs, storedIndexOffset: storedIndexOffset, fieldsIndexOffset: sectionsIndexOffset, sectionsIndexOffset: sectionsIndexOffset, fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), docValueOffset: 0, // docValueOffsets identified automatically by the section - dictLocs: dictLocs, fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), + // following fields gets populated by loadFieldsNew + fieldsMap: make(map[string]uint16), + dictLocs: make([]uint64, 0), + fieldsInv: make([]string, 0), } sb.updateSize() diff --git a/vendor/github.com/blevesearch/zapx/v16/chunk.go b/vendor/github.com/blevesearch/zapx/v16/chunk.go index 4307d0ed29..53d124f063 100644 --- a/vendor/github.com/blevesearch/zapx/v16/chunk.go +++ b/vendor/github.com/blevesearch/zapx/v16/chunk.go @@ -15,6 +15,7 @@ package zap import ( + "errors" "fmt" ) @@ -26,8 +27,18 @@ var LegacyChunkMode uint32 = 1024 // be used by default. var DefaultChunkMode uint32 = 1026 +var ErrChunkSizeZero = errors.New("chunk size is zero") + +// getChunkSize returns the chunk size for the given chunkMode, cardinality, and +// maxDocs. +// +// In error cases, the returned chunk size will be 0. Caller can differentiate +// between a valid chunk size of 0 and an error by checking for ErrChunkSizeZero. func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, error) { switch { + case chunkMode == 0: + return 0, ErrChunkSizeZero + // any chunkMode <= 1024 will always chunk with chunkSize=chunkMode case chunkMode <= 1024: // legacy chunk size @@ -46,6 +57,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, // chunk-size items. // no attempt is made to tweak any other case if cardinality <= 1024 { + if maxDocs == 0 { + return 0, ErrChunkSizeZero + } return maxDocs, nil } return 1024, nil @@ -61,6 +75,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, // 2. convert to chunkSize, dividing into maxDocs numChunks := (cardinality / 1024) + 1 chunkSize := maxDocs / numChunks + if chunkSize == 0 { + return 0, ErrChunkSizeZero + } return chunkSize, nil } return 0, fmt.Errorf("unknown chunk mode %d", chunkMode) diff --git a/vendor/github.com/blevesearch/zapx/v16/contentcoder.go b/vendor/github.com/blevesearch/zapx/v16/contentcoder.go index cd8b3fc86f..3343d31795 100644 --- a/vendor/github.com/blevesearch/zapx/v16/contentcoder.go +++ b/vendor/github.com/blevesearch/zapx/v16/contentcoder.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "io" "reflect" - "sync/atomic" "github.com/golang/snappy" ) @@ -37,7 +36,7 @@ var ( ) type chunkedContentCoder struct { - bytesWritten uint64 // atomic access to this variable, moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS. + bytesWritten uint64 // moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS. final []byte chunkSize uint64 @@ -112,11 +111,11 @@ func (c *chunkedContentCoder) Close() error { } func (c *chunkedContentCoder) incrementBytesWritten(val uint64) { - atomic.AddUint64(&c.bytesWritten, val) + c.bytesWritten += val } func (c *chunkedContentCoder) getBytesWritten() uint64 { - return atomic.LoadUint64(&c.bytesWritten) + return c.bytesWritten } func (c *chunkedContentCoder) flushContents() error { diff --git a/vendor/github.com/blevesearch/zapx/v16/docvalues.go b/vendor/github.com/blevesearch/zapx/v16/docvalues.go index 6fb7a9a20b..3d0d269f63 100644 --- a/vendor/github.com/blevesearch/zapx/v16/docvalues.go +++ b/vendor/github.com/blevesearch/zapx/v16/docvalues.go @@ -75,7 +75,6 @@ type docValueReader struct { curChunkData []byte // compressed data cache uncompressed []byte // temp buf for snappy decompression - // atomic access to this variable bytesRead uint64 } diff --git a/vendor/github.com/blevesearch/zapx/v16/faiss_vector_cache.go b/vendor/github.com/blevesearch/zapx/v16/faiss_vector_cache.go index 893da2d5f2..f4aa6087ae 100644 --- a/vendor/github.com/blevesearch/zapx/v16/faiss_vector_cache.go +++ b/vendor/github.com/blevesearch/zapx/v16/faiss_vector_cache.go @@ -52,54 +52,97 @@ func (vc *vectorIndexCache) Clear() { vc.m.Unlock() } -func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, except *roaring.Bitmap) ( - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) { - var found bool - index, vecDocIDMap, vecIDsToExclude, found = vc.loadFromCache(fieldID, except) - if !found { - index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except) - } - return index, vecDocIDMap, vecIDsToExclude, err +// loadDocVecIDMap indicates if a non-nil docVecIDMap should be returned. +// It is true when a filtered kNN query accesses the cache since it requires the +// map. It's false otherwise. +func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, + loadDocVecIDMap bool, except *roaring.Bitmap) ( + index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64, + vecIDsToExclude []int64, err error) { + index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err = vc.loadFromCache( + fieldID, loadDocVecIDMap, mem, except) + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err } -func (vc *vectorIndexCache) loadFromCache(fieldID uint16, except *roaring.Bitmap) ( - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, found bool) { +// function to load the vectorDocIDMap and if required, docVecIDMap from cache +// If not, it will create these and add them to the cache. +func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool, + mem []byte, except *roaring.Bitmap) (index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, + docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) { + vc.m.RLock() - defer vc.m.RUnlock() - entry, ok := vc.cache[fieldID] - if !ok { - return nil, nil, nil, false - } - - index, vecDocIDMap = entry.load() - vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) - - return index, vecDocIDMap, vecIDsToExclude, true -} - -func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *roaring.Bitmap) ( - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) { - vc.m.Lock() - defer vc.m.Unlock() - - // when there are multiple threads trying to build the index, guard redundant - // index creation by doing a double check and return if already created and - // cached. entry, ok := vc.cache[fieldID] if ok { - index, vecDocIDMap = entry.load() + index, vecDocIDMap, docVecIDMap = entry.load() vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) - return index, vecDocIDMap, vecIDsToExclude, nil + if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) { + vc.m.RUnlock() + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil + } + + vc.m.RUnlock() + vc.m.Lock() + // in cases where only the docVecID isn't part of the cache, build it and + // add it to the cache, while holding a lock to avoid concurrent modifications. + // typically seen for the first filtered query. + docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry) + vc.m.Unlock() + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil } - // if the cache doesn't have entry, construct the vector to doc id map and the - // vector index out of the mem bytes and update the cache under lock. + vc.m.RUnlock() + // acquiring a lock since this is modifying the cache. + vc.m.Lock() + defer vc.m.Unlock() + return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, except) +} + +func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint32][]int64 { + // Handle concurrent accesses (to avoid unnecessary work) by adding a + // check within the write lock here. + if ce.docVecIDMap != nil { + return ce.docVecIDMap + } + + docVecIDMap := make(map[uint32][]int64) + for vecID, docID := range ce.vecDocIDMap { + docVecIDMap[docID] = append(docVecIDMap[docID], vecID) + } + + ce.docVecIDMap = docVecIDMap + return docVecIDMap +} + +// Rebuilding the cache on a miss. +func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, + loadDocVecIDMap bool, except *roaring.Bitmap) ( + index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, + docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) { + + // Handle concurrent accesses (to avoid unnecessary work) by adding a + // check within the write lock here. + entry := vc.cache[fieldID] + if entry != nil { + index, vecDocIDMap, docVecIDMap = entry.load() + vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) + if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) { + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil + } + docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry) + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil + } + + // if the cache doesn't have the entry, construct the vector to doc id map and + // the vector index out of the mem bytes and update the cache under lock. pos := 0 numVecs, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += n vecDocIDMap = make(map[int64]uint32, numVecs) + if loadDocVecIDMap { + docVecIDMap = make(map[uint32][]int64, numVecs) + } isExceptNotEmpty := except != nil && !except.IsEmpty() for i := 0; i < int(numVecs); i++ { vecID, n := binary.Varint(mem[pos : pos+binary.MaxVarintLen64]) @@ -113,6 +156,9 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r continue } vecDocIDMap[vecID] = docIDUint32 + if loadDocVecIDMap { + docVecIDMap[docIDUint32] = append(docVecIDMap[docIDUint32], vecID) + } } indexSize, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) @@ -120,15 +166,16 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r index, err = faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - vc.insertLOCKED(fieldID, index, vecDocIDMap) - return index, vecDocIDMap, vecIDsToExclude, nil + vc.insertLOCKED(fieldID, index, vecDocIDMap, loadDocVecIDMap, docVecIDMap) + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil } func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32) { + index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, loadDocVecIDMap bool, + docVecIDMap map[uint32][]int64) { // the first time we've hit the cache, try to spawn a monitoring routine // which will reconcile the moving averages for all the fields being hit if len(vc.cache) == 0 { @@ -142,7 +189,8 @@ func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, // this makes the average to be kept above the threshold value for a // longer time and thereby the index to be resident in the cache // for longer time. - vc.cache[fieldIDPlus1] = createCacheEntry(index, vecDocIDMap, 0.4) + vc.cache[fieldIDPlus1] = createCacheEntry(index, vecDocIDMap, + loadDocVecIDMap, docVecIDMap, 0.4) } } @@ -235,8 +283,9 @@ func (e *ewma) add(val uint64) { // ----------------------------------------------------------------------------- -func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, alpha float64) *cacheEntry { - return &cacheEntry{ +func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, + loadDocVecIDMap bool, docVecIDMap map[uint32][]int64, alpha float64) *cacheEntry { + ce := &cacheEntry{ index: index, vecDocIDMap: vecDocIDMap, tracker: &ewma{ @@ -245,6 +294,10 @@ func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, alph }, refs: 1, } + if loadDocVecIDMap { + ce.docVecIDMap = docVecIDMap + } + return ce } type cacheEntry struct { @@ -257,6 +310,7 @@ type cacheEntry struct { index *faiss.IndexImpl vecDocIDMap map[int64]uint32 + docVecIDMap map[uint32][]int64 } func (ce *cacheEntry) incHit() { @@ -271,10 +325,10 @@ func (ce *cacheEntry) decRef() { atomic.AddInt64(&ce.refs, -1) } -func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]uint32) { +func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]uint32, map[uint32][]int64) { ce.incHit() ce.addRef() - return ce.index, ce.vecDocIDMap + return ce.index, ce.vecDocIDMap, ce.docVecIDMap } func (ce *cacheEntry) close() { @@ -282,6 +336,7 @@ func (ce *cacheEntry) close() { ce.index.Close() ce.index = nil ce.vecDocIDMap = nil + ce.docVecIDMap = nil }() } diff --git a/vendor/github.com/blevesearch/zapx/v16/faiss_vector_posting.go b/vendor/github.com/blevesearch/zapx/v16/faiss_vector_posting.go index e4275d76c5..6b9840fef0 100644 --- a/vendor/github.com/blevesearch/zapx/v16/faiss_vector_posting.go +++ b/vendor/github.com/blevesearch/zapx/v16/faiss_vector_posting.go @@ -134,12 +134,15 @@ func (p *VecPostingsList) Size() int { } func (p *VecPostingsList) Count() uint64 { - n := p.postings.GetCardinality() - var e uint64 - if p.except != nil { - e = p.postings.AndCardinality(p.except) + if p.postings != nil { + n := p.postings.GetCardinality() + var e uint64 + if p.except != nil { + e = p.postings.AndCardinality(p.except) + } + return n - e } - return n - e + return 0 } func (vpl *VecPostingsList) ResetBytesRead(val uint64) { @@ -267,16 +270,26 @@ func (vpl *VecPostingsIterator) BytesWritten() uint64 { // vectorIndexWrapper conforms to scorch_segment_api's VectorIndex interface type vectorIndexWrapper struct { - search func(qVector []float32, k int64, params json.RawMessage) (segment.VecPostingsList, error) - close func() - size func() uint64 + search func(qVector []float32, k int64, + params json.RawMessage) (segment.VecPostingsList, error) + searchWithFilter func(qVector []float32, k int64, eligibleDocIDs []uint64, + params json.RawMessage) (segment.VecPostingsList, error) + close func() + size func() uint64 } -func (i *vectorIndexWrapper) Search(qVector []float32, k int64, params json.RawMessage) ( +func (i *vectorIndexWrapper) Search(qVector []float32, k int64, + params json.RawMessage) ( segment.VecPostingsList, error) { return i.search(qVector, k, params) } +func (i *vectorIndexWrapper) SearchWithFilter(qVector []float32, k int64, + eligibleDocIDs []uint64, params json.RawMessage) ( + segment.VecPostingsList, error) { + return i.searchWithFilter(qVector, k, eligibleDocIDs, params) +} + func (i *vectorIndexWrapper) Close() { i.close() } @@ -288,20 +301,40 @@ func (i *vectorIndexWrapper) Size() uint64 { // InterpretVectorIndex returns a construct of closures (vectorIndexWrapper) // that will allow the caller to - // (1) search within an attached vector index -// (2) close attached vector index -// (3) get the size of the attached vector index -func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap) ( +// (2) search limited to a subset of documents within an attached vector index +// (3) close attached vector index +// (4) get the size of the attached vector index +func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool, + except *roaring.Bitmap) ( segment.VectorIndex, error) { // Params needed for the closures var vecIndex *faiss.IndexImpl var vecDocIDMap map[int64]uint32 + var docVecIDMap map[uint32][]int64 var vectorIDsToExclude []int64 var fieldIDPlus1 uint16 var vecIndexSize uint64 + // Utility function to add the corresponding docID and scores for each vector + // returned after the kNN query to the newly + // created vecPostingsList + addIDsToPostingsList := func(pl *VecPostingsList, ids []int64, scores []float32) { + for i := 0; i < len(ids); i++ { + vecID := ids[i] + // Checking if it's present in the vecDocIDMap. + // If -1 is returned as an ID(insufficient vectors), this will ensure + // it isn't added to the final postings list. + if docID, ok := vecDocIDMap[vecID]; ok { + code := getVectorCode(docID, scores[i]) + pl.postings.Add(uint64(code)) + } + } + } + var ( wrapVecIndex = &vectorIndexWrapper{ - search: func(qVector []float32, k int64, params json.RawMessage) (segment.VecPostingsList, error) { + search: func(qVector []float32, k int64, params json.RawMessage) ( + segment.VecPostingsList, error) { // 1. returned postings list (of type PostingsList) has two types of information - docNum and its score. // 2. both the values can be represented using roaring bitmaps. // 3. the Iterator (of type PostingsIterator) returned would operate in terms of VecPostings. @@ -318,23 +351,204 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap return rv, nil } - scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k, vectorIDsToExclude, params) + scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k, + vectorIDsToExclude, params) if err != nil { return nil, err } - // for every similar vector returned by the Search() API, add the corresponding - // docID and the score to the newly created vecPostingsList - for i := 0; i < len(ids); i++ { - vecID := ids[i] - // Checking if it's present in the vecDocIDMap. - // If -1 is returned as an ID(insufficient vectors), this will ensure - // it isn't added to the final postings list. - if docID, ok := vecDocIDMap[vecID]; ok { - code := getVectorCode(docID, scores[i]) - rv.postings.Add(uint64(code)) - } + + addIDsToPostingsList(rv, ids, scores) + + return rv, nil + }, + searchWithFilter: func(qVector []float32, k int64, + eligibleDocIDs []uint64, params json.RawMessage) ( + segment.VecPostingsList, error) { + // 1. returned postings list (of type PostingsList) has two types of information - docNum and its score. + // 2. both the values can be represented using roaring bitmaps. + // 3. the Iterator (of type PostingsIterator) returned would operate in terms of VecPostings. + // 4. VecPostings would just have the docNum and the score. Every call of Next() + // and Advance just returns the next VecPostings. The caller would do a vp.Number() + // and the Score() to get the corresponding values + rv := &VecPostingsList{ + except: nil, // todo: handle the except bitmap within postings iterator. + postings: roaring64.New(), } + if vecIndex == nil || vecIndex.D() != len(qVector) { + // vector index not found or dimensionality mismatched + return rv, nil + } + + if len(eligibleDocIDs) > 0 { + // Non-zero documents eligible per the filter query. + + // If every element in the index is eligible(eg. high selectivity + // cases), then this can basically be considered unfiltered kNN. + if len(eligibleDocIDs) == int(sb.numDocs) { + scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k, + vectorIDsToExclude, params) + if err != nil { + return nil, err + } + + addIDsToPostingsList(rv, ids, scores) + return rv, nil + } + + // vector IDs corresponding to the local doc numbers to be + // considered for the search + vectorIDsToInclude := make([]int64, 0, len(eligibleDocIDs)) + for _, id := range eligibleDocIDs { + vectorIDsToInclude = append(vectorIDsToInclude, docVecIDMap[uint32(id)]...) + } + + // Retrieve the mapping of centroid IDs to vectors within + // the cluster. + clusterAssignment, _ := vecIndex.ObtainClusterToVecIDsFromIVFIndex() + // Accounting for a flat index + if len(clusterAssignment) == 0 { + scores, ids, err := vecIndex.SearchWithIDs(qVector, k, + vectorIDsToInclude, params) + if err != nil { + return nil, err + } + + addIDsToPostingsList(rv, ids, scores) + return rv, nil + } + + // Converting to roaring bitmap for ease of intersect ops with + // the set of eligible doc IDs. + centroidVecIDMap := make(map[int64]*roaring.Bitmap) + for centroidID, vecIDs := range clusterAssignment { + if _, exists := centroidVecIDMap[centroidID]; !exists { + centroidVecIDMap[centroidID] = roaring.NewBitmap() + } + vecIDsUint32 := make([]uint32, 0, len(vecIDs)) + for _, vecID := range vecIDs { + vecIDsUint32 = append(vecIDsUint32, uint32(vecID)) + } + centroidVecIDMap[centroidID].AddMany(vecIDsUint32) + } + + // Determining which clusters, identified by centroid ID, + // have at least one eligible vector and hence, ought to be + // probed. + eligibleCentroidIDs := make([]int64, 0) + + var selector faiss.Selector + var err error + // If there are more elements to be included than excluded, it + // might be quicker to use an exclusion selector as a filter + // instead of an inclusion selector. + if float32(len(eligibleDocIDs))/float32(len(docVecIDMap)) > 0.5 { + ineligibleVecIDsBitmap := roaring.NewBitmap() + eligibleDocIDsMap := make(map[uint64]struct{}) + for _, eligibleDocID := range eligibleDocIDs { + eligibleDocIDsMap[(eligibleDocID)] = struct{}{} + } + + ineligibleVectorIDs := make([]int64, 0, len(vecDocIDMap)- + len(vectorIDsToInclude)) + + for docID, vecIDs := range docVecIDMap { + if _, exists := eligibleDocIDsMap[uint64(docID)]; !exists { + for _, vecID := range vecIDs { + ineligibleVecIDsBitmap.Add(uint32(vecID)) + ineligibleVectorIDs = append(ineligibleVectorIDs, vecID) + } + } + } + + for centroidID, vecIDs := range centroidVecIDMap { + vecIDs.AndNot(ineligibleVecIDsBitmap) + // At least one eligible vec in cluster. + if !vecIDs.IsEmpty() { + // The mapping is now reduced to those vectors which + // are also eligible docs for the filter query. + centroidVecIDMap[centroidID] = vecIDs + eligibleCentroidIDs = append(eligibleCentroidIDs, centroidID) + } else { + // don't consider clusters with no eligible IDs. + delete(centroidVecIDMap, centroidID) + } + } + + selector, err = faiss.NewIDSelectorNot(ineligibleVectorIDs) + } else { + // Getting the vector IDs corresponding to the eligible + // doc IDs. + // The docVecIDMap maps each docID to vectorIDs corresponding + // to it. + // Usually, each docID has one vecID mapped to it unless + // the vector is nested, in which case there can be multiple + // vectorIDs mapped to the same docID. + // Eg. docID d1 -> vecID v1, for the first case + // d1 -> {v1,v2}, for the second case. + eligibleVecIDsBitmap := roaring.NewBitmap() + vecIDsUint32 := make([]uint32, 0) + for _, eligibleDocID := range eligibleDocIDs { + vecIDs := docVecIDMap[uint32(eligibleDocID)] + for _, vecID := range vecIDs { + vecIDsUint32 = append(vecIDsUint32, uint32(vecID)) + } + } + eligibleVecIDsBitmap.AddMany(vecIDsUint32) + for centroidID, vecIDs := range centroidVecIDMap { + vecIDs.And(eligibleVecIDsBitmap) + if !vecIDs.IsEmpty() { + // The mapping is now reduced to those vectors which + // are also eligible docs for the filter query. + centroidVecIDMap[centroidID] = vecIDs + eligibleCentroidIDs = append(eligibleCentroidIDs, centroidID) + } else { + // don't consider clusters with no eligible IDs. + delete(centroidVecIDMap, centroidID) + } + } + + selector, err = faiss.NewIDSelectorBatch(vectorIDsToInclude) + } + if err != nil { + return nil, err + } + + // Ordering the retrieved centroid IDs by increasing order + // of distance i.e. decreasing order of proximity to query vector. + closestCentroidIDs, centroidDistances, _ := + vecIndex.ObtainClustersWithDistancesFromIVFIndex(qVector, + eligibleCentroidIDs) + + // Getting the nprobe value set at index time. + nprobe := vecIndex.GetNProbe() + + eligibleDocsTillNow := int64(0) + minEligibleCentroids := 0 + for i, centroidID := range closestCentroidIDs { + eligibleDocsTillNow += int64(centroidVecIDMap[centroidID].GetCardinality()) + if eligibleDocsTillNow >= k && i >= int(nprobe-1) { + // Continue till at least 'K' cumulative vectors are + // collected or 'nprobe' clusters are examined, whichever + // comes later. + minEligibleCentroids = i + 1 + break + } + minEligibleCentroids = i + 1 + } + + // Search the clusters specified by 'closestCentroidIDs' for + // vectors whose IDs are present in 'vectorIDsToInclude' + scores, ids, err := vecIndex.SearchClustersFromIVFIndex( + selector, len(vectorIDsToInclude), closestCentroidIDs, + minEligibleCentroids, k, qVector, centroidDistances, params) + if err != nil { + return nil, err + } + + addIDsToPostingsList(rv, ids, scores) + return rv, nil + } return rv, nil }, close: func() { @@ -372,8 +586,9 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap pos += n } - vecIndex, vecDocIDMap, vectorIDsToExclude, err = - sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except) + vecIndex, vecDocIDMap, docVecIDMap, vectorIDsToExclude, err = + sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], requiresFiltering, + except) if vecIndex != nil { vecIndexSize = vecIndex.Size() diff --git a/vendor/github.com/blevesearch/zapx/v16/intDecoder.go b/vendor/github.com/blevesearch/zapx/v16/intDecoder.go index e50c471722..1a69e61450 100644 --- a/vendor/github.com/blevesearch/zapx/v16/intDecoder.go +++ b/vendor/github.com/blevesearch/zapx/v16/intDecoder.go @@ -27,7 +27,6 @@ type chunkedIntDecoder struct { data []byte r *memUvarintReader - // atomic access to this variable bytesRead uint64 } diff --git a/vendor/github.com/blevesearch/zapx/v16/intcoder.go b/vendor/github.com/blevesearch/zapx/v16/intcoder.go index 2957fbd098..e1586edc46 100644 --- a/vendor/github.com/blevesearch/zapx/v16/intcoder.go +++ b/vendor/github.com/blevesearch/zapx/v16/intcoder.go @@ -18,7 +18,6 @@ import ( "bytes" "encoding/binary" "io" - "sync/atomic" ) // We can safely use 0 to represent termNotEncoded since 0 @@ -36,7 +35,6 @@ type chunkedIntCoder struct { buf []byte - // atomic access to this variable bytesWritten uint64 } @@ -79,11 +77,11 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) { } func (c *chunkedIntCoder) incrementBytesWritten(val uint64) { - atomic.AddUint64(&c.bytesWritten, val) + c.bytesWritten += val } func (c *chunkedIntCoder) getBytesWritten() uint64 { - return atomic.LoadUint64(&c.bytesWritten) + return c.bytesWritten } // Add encodes the provided integers into the correct chunk for the provided diff --git a/vendor/github.com/blevesearch/zapx/v16/merge.go b/vendor/github.com/blevesearch/zapx/v16/merge.go index 490e9da016..683d92098a 100644 --- a/vendor/github.com/blevesearch/zapx/v16/merge.go +++ b/vendor/github.com/blevesearch/zapx/v16/merge.go @@ -73,8 +73,8 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat // wrap it for counting (tracking offsets) cr := NewCountHashWriterWithStatsReporter(br, s) - newDocNums, numDocs, storedIndexOffset, _, _, _, sectionsIndexOffset, err := - MergeToWriter(segmentBases, drops, chunkMode, cr, closeCh) + newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err := + mergeToWriter(segmentBases, drops, chunkMode, cr, closeCh) if err != nil { cleanup() return nil, 0, err @@ -109,9 +109,9 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat return newDocNums, uint64(cr.Count()), nil } -func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, +func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) ( - newDocNums [][]uint64, numDocs, storedIndexOffset uint64, dictLocs []uint64, + newDocNums [][]uint64, numDocs, storedIndexOffset uint64, fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64, err error) { @@ -122,7 +122,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, numDocs = computeNewDocCount(segments, drops) if isClosed(closeCh) { - return nil, 0, 0, nil, nil, nil, 0, seg.ErrClosed + return nil, 0, 0, nil, nil, 0, seg.ErrClosed } // the merge opaque is especially important when it comes to tracking the file @@ -140,7 +140,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh) if err != nil { - return nil, 0, 0, nil, nil, nil, 0, err + return nil, 0, 0, nil, nil, 0, err } // at this point, ask each section implementation to merge itself @@ -149,21 +149,19 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh) if err != nil { - return nil, 0, 0, nil, nil, nil, 0, err + return nil, 0, 0, nil, nil, 0, err } } - } else { - dictLocs = make([]uint64, len(fieldsInv)) } // we can persist the fields section index now, this will point // to the various indexes (each in different section) available for a field. - sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, dictLocs, mergeOpaque) + sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque) if err != nil { - return nil, 0, 0, nil, nil, nil, 0, err + return nil, 0, 0, nil, nil, 0, err } - return newDocNums, numDocs, storedIndexOffset, dictLocs, fieldsInv, fieldsMap, sectionsIndexOffset, nil + return newDocNums, numDocs, storedIndexOffset, fieldsInv, fieldsMap, sectionsIndexOffset, nil } // mapFields takes the fieldsInv list and returns a map of fieldName @@ -306,6 +304,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo use1HitEncoding func(uint64) (bool, uint64, uint64), w *CountHashWriter, bufMaxVarintLen64 []byte) ( offset uint64, err error) { + if postings == nil { + return 0, nil + } + termCardinality := postings.GetCardinality() if termCardinality <= 0 { return 0, nil diff --git a/vendor/github.com/blevesearch/zapx/v16/new.go b/vendor/github.com/blevesearch/zapx/v16/new.go index 94079eaf4c..f0d37c4348 100644 --- a/vendor/github.com/blevesearch/zapx/v16/new.go +++ b/vendor/github.com/blevesearch/zapx/v16/new.go @@ -66,14 +66,13 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, s.chunkMode = chunkMode s.w = NewCountHashWriter(&br) - storedIndexOffset, dictOffsets, sectionsIndexOffset, err := s.convert() + storedIndexOffset, sectionsIndexOffset, err := s.convert() if err != nil { return nil, uint64(0), err } sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode, - s.FieldsMap, s.FieldsInv, uint64(len(results)), - storedIndexOffset, dictOffsets, sectionsIndexOffset) + uint64(len(results)), storedIndexOffset, sectionsIndexOffset) // get the bytes written before the interim's reset() call // write it to the newly formed segment base. @@ -125,8 +124,10 @@ func (s *interim) reset() (err error) { s.results = nil s.chunkMode = 0 s.w = nil - s.FieldsMap = nil - s.FieldsInv = nil + for k := range s.FieldsMap { + delete(s.FieldsMap, k) + } + s.FieldsInv = s.FieldsInv[:0] s.metaBuf.Reset() s.tmp0 = s.tmp0[:0] s.tmp1 = s.tmp1[:0] @@ -168,8 +169,10 @@ type interimLoc struct { arrayposs []uint64 } -func (s *interim) convert() (uint64, []uint64, uint64, error) { - s.FieldsMap = map[string]uint16{} +func (s *interim) convert() (uint64, uint64, error) { + if s.FieldsMap == nil { + s.FieldsMap = map[string]uint16{} + } args := map[string]interface{}{ "results": s.results, @@ -209,17 +212,15 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) { storedIndexOffset, err := s.writeStoredFields() if err != nil { - return 0, nil, 0, err + return 0, 0, err } - var dictOffsets []uint64 - // we can persist the various sections at this point. // the rule of thumb here is that each section must persist field wise. for _, x := range segmentSections { _, err = x.Persist(s.opaque, s.w) if err != nil { - return 0, nil, 0, err + return 0, 0, err } } @@ -231,18 +232,14 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) { } } - if len(s.results) == 0 { - dictOffsets = make([]uint64, len(s.FieldsInv)) - } - // we can persist a new fields section here // this new fields section will point to the various indexes available - sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, dictOffsets, s.opaque) + sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque) if err != nil { - return 0, nil, 0, err + return 0, 0, err } - return storedIndexOffset, dictOffsets, sectionsIndexOffset, nil + return storedIndexOffset, sectionsIndexOffset, nil } func (s *interim) getOrDefineField(fieldName string) int { diff --git a/vendor/github.com/blevesearch/zapx/v16/posting.go b/vendor/github.com/blevesearch/zapx/v16/posting.go index ad47df0dd6..07ae202f6d 100644 --- a/vendor/github.com/blevesearch/zapx/v16/posting.go +++ b/vendor/github.com/blevesearch/zapx/v16/posting.go @@ -305,10 +305,7 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { chunkSize, err := getChunkSize(d.sb.chunkMode, rv.postings.GetCardinality(), d.sb.numDocs) if err != nil { - return err - } else if chunkSize == 0 { - return fmt.Errorf("chunk size is zero, chunkMode: %v, numDocs: %v", - d.sb.chunkMode, d.sb.numDocs) + return fmt.Errorf("failed to get chunk size: %v", err) } rv.chunkSize = chunkSize @@ -632,6 +629,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfter(atOrAfter uint64) (uint64, bool, return i.nextDocNumAtOrAfterClean(atOrAfter) } + if i.postings.chunkSize == 0 { + return 0, false, ErrChunkSizeZero + } + i.Actual.AdvanceIfNeeded(uint32(atOrAfter)) if !i.Actual.HasNext() || !i.all.HasNext() { @@ -741,6 +742,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfterClean( return uint64(i.Actual.Next()), true, nil } + if i.postings != nil && i.postings.chunkSize == 0 { + return 0, false, ErrChunkSizeZero + } + // freq-norm's needed, so maintain freq-norm chunk reader sameChunkNexts := 0 // # of times we called Next() in the same chunk n := i.Actual.Next() diff --git a/vendor/github.com/blevesearch/zapx/v16/section_faiss_vector_index.go b/vendor/github.com/blevesearch/zapx/v16/section_faiss_vector_index.go index c73bf01113..1c9f91a06c 100644 --- a/vendor/github.com/blevesearch/zapx/v16/section_faiss_vector_index.go +++ b/vendor/github.com/blevesearch/zapx/v16/section_faiss_vector_index.go @@ -68,12 +68,14 @@ func (v *faissVectorIndexSection) AddrForField(opaque map[int]resetable, fieldID return vo.fieldAddrs[uint16(fieldID)] } -// metadata corresponding to a serialized vector index -type vecIndexMeta struct { +// information specific to a vector index - (including metadata and +// the index pointer itself) +type vecIndexInfo struct { startOffset int indexSize uint64 vecIds []int64 indexOptimizedFor string + index *faiss.IndexImpl } // keep in mind with respect to update and delete operations with respect to vectors @@ -87,7 +89,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se // in the segment this will help by avoiding multiple allocation // calls. vecSegs := make([]*SegmentBase, 0, len(segments)) - indexes := make([]*vecIndexMeta, 0, len(segments)) + indexes := make([]*vecIndexInfo, 0, len(segments)) for fieldID, fieldName := range fieldsInv { indexes = indexes[:0] // resizing the slices @@ -128,7 +130,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se pos += n vecSegs = append(vecSegs, sb) - indexes = append(indexes, &vecIndexMeta{ + indexes = append(indexes, &vecIndexInfo{ vecIds: make([]int64, 0, numVecs), indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)], }) @@ -182,7 +184,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se } func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter, - vecToDocID map[int64]uint64, indexes []*vecIndexMeta) error { + vecToDocID map[int64]uint64, indexes []*vecIndexInfo) error { tempBuf := v.grabBuf(binary.MaxVarintLen64) // early exit if there are absolutely no valid vectors present in the segment @@ -275,9 +277,14 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 { // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, - indexes []*vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error { + vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error { - vecIndexes := make([]*faiss.IndexImpl, 0, len(sbs)) + // safe to assume that all the indexes are of the same config values, given + // that they are extracted from the field mapping info. + var dims, metric int + var indexOptimizedFor string + + var validMerge bool var finalVecIDCap, indexDataCap, reconsCap int for segI, segBase := range sbs { // Considering merge operations on vector indexes are expensive, it is @@ -287,26 +294,37 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, freeReconstructedIndexes(vecIndexes) return seg.ErrClosed } + if len(vecIndexes[segI].vecIds) == 0 { + // no valid vectors for this index, don't bring it into memory + continue + } + // read the index bytes. todo: parallelize this - indexBytes := segBase.mem[indexes[segI].startOffset : indexes[segI].startOffset+int(indexes[segI].indexSize)] + indexBytes := segBase.mem[vecIndexes[segI].startOffset : vecIndexes[segI].startOffset+int(vecIndexes[segI].indexSize)] index, err := faiss.ReadIndexFromBuffer(indexBytes, faissIOFlags) if err != nil { freeReconstructedIndexes(vecIndexes) return err } - if len(indexes[segI].vecIds) > 0 { - indexReconsLen := len(indexes[segI].vecIds) * index.D() + if len(vecIndexes[segI].vecIds) > 0 { + indexReconsLen := len(vecIndexes[segI].vecIds) * index.D() if indexReconsLen > reconsCap { reconsCap = indexReconsLen } indexDataCap += indexReconsLen - finalVecIDCap += len(indexes[segI].vecIds) + finalVecIDCap += len(vecIndexes[segI].vecIds) } - vecIndexes = append(vecIndexes, index) + vecIndexes[segI].index = index + + validMerge = true + // set the dims and metric values from the constructed index. + dims = index.D() + metric = int(index.MetricType()) + indexOptimizedFor = vecIndexes[segI].indexOptimizedFor } - // no vector indexes to merge - if len(vecIndexes) == 0 { + // not a valid merge operation as there are no valid indexes to merge. + if !validMerge { return nil } @@ -326,18 +344,18 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, // reconstruct the vectors only if present, it could be that // some of the indexes had all of their vectors updated/deleted. - if len(indexes[i].vecIds) > 0 { - neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D() + if len(vecIndexes[i].vecIds) > 0 { + neededReconsLen := len(vecIndexes[i].vecIds) * vecIndexes[i].index.D() recons = recons[:neededReconsLen] // todo: parallelize reconstruction - recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons) + recons, err = vecIndexes[i].index.ReconstructBatch(vecIndexes[i].vecIds, recons) if err != nil { freeReconstructedIndexes(vecIndexes) return err } indexData = append(indexData, recons...) // Adding vector IDs in the same order as the vectors - finalVecIDs = append(finalVecIDs, indexes[i].vecIds...) + finalVecIDs = append(finalVecIDs, vecIndexes[i].vecIds...) } } @@ -351,12 +369,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, nvecs := len(finalVecIDs) - // safe to assume that all the indexes are of the same config values, given - // that they are extracted from the field mapping info. - dims := vecIndexes[0].D() - metric := vecIndexes[0].MetricType() - indexOptimizedFor := indexes[0].indexOptimizedFor - // index type to be created after merge based on the number of vectors // in indexData added into the index. nlist := determineCentroids(nvecs) @@ -419,9 +431,11 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, } // todo: can be parallelized. -func freeReconstructedIndexes(indexes []*faiss.IndexImpl) { - for _, index := range indexes { - index.Close() +func freeReconstructedIndexes(indexes []*vecIndexInfo) { + for _, entry := range indexes { + if entry.index != nil { + entry.index.Close() + } } } @@ -494,8 +508,10 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint ids = append(ids, hash) } + // Set the faiss metric type (default is Euclidean Distance or l2_norm) var metric = faiss.MetricL2 - if content.metric == index.CosineSimilarity { + if content.metric == index.InnerProduct || content.metric == index.CosineSimilarity { + // use the same FAISS metric for inner product and cosine similarity metric = faiss.MetricInnerProduct } diff --git a/vendor/github.com/blevesearch/zapx/v16/section_inverted_text_index.go b/vendor/github.com/blevesearch/zapx/v16/section_inverted_text_index.go index d19dc9453d..ea8722e475 100644 --- a/vendor/github.com/blevesearch/zapx/v16/section_inverted_text_index.go +++ b/vendor/github.com/blevesearch/zapx/v16/section_inverted_text_index.go @@ -398,11 +398,11 @@ func (i *invertedIndexOpaque) grabBuf(size int) []byte { } func (i *invertedIndexOpaque) incrementBytesWritten(bytes uint64) { - atomic.AddUint64(&i.bytesWritten, bytes) + i.bytesWritten += bytes } func (i *invertedIndexOpaque) BytesWritten() uint64 { - return atomic.LoadUint64(&i.bytesWritten) + return i.bytesWritten } func (i *invertedIndexOpaque) BytesRead() uint64 { @@ -412,7 +412,6 @@ func (i *invertedIndexOpaque) BytesRead() uint64 { func (i *invertedIndexOpaque) ResetBytesRead(uint64) {} func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uint64, err error) { - if io.results == nil || len(io.results) == 0 { return nil, nil } @@ -462,7 +461,11 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin locs := io.Locs[pid] locOffset := 0 - chunkSize, err := getChunkSize(io.chunkMode, postingsBS.GetCardinality(), uint64(len(io.results))) + var cardinality uint64 + if postingsBS != nil { + cardinality = postingsBS.GetCardinality() + } + chunkSize, err := getChunkSize(io.chunkMode, cardinality, uint64(len(io.results))) if err != nil { return nil, err } diff --git a/vendor/github.com/blevesearch/zapx/v16/segment.go b/vendor/github.com/blevesearch/zapx/v16/segment.go index 8dce0856ab..8780ead19c 100644 --- a/vendor/github.com/blevesearch/zapx/v16/segment.go +++ b/vendor/github.com/blevesearch/zapx/v16/segment.go @@ -326,7 +326,7 @@ func (s *SegmentBase) loadFieldsNew() error { if seek > uint64(len(s.mem)) { // handling a buffer overflow case. // a rare case where the backing buffer is not large enough to be read directly via - // a pos+binary.MaxVarinLen64 seek. For eg, this can happen when there is only + // a pos+binary.MaxVarintLen64 seek. For eg, this can happen when there is only // one field to be indexed in the entire batch of data and while writing out // these fields metadata, you write 1 + 8 bytes whereas the MaxVarintLen64 = 10. seek = uint64(len(s.mem)) @@ -342,7 +342,7 @@ func (s *SegmentBase) loadFieldsNew() error { // the following loop will be executed only once in the edge case pointed out above // since there is only field's offset store which occupies 8 bytes. // the pointer then seeks to a position preceding the sectionsIndexOffset, at - // which point the responbility of handling the out-of-bounds cases shifts to + // which point the responsibility of handling the out-of-bounds cases shifts to // the specific section's parsing logic. var fieldID uint64 for fieldID < numFields { @@ -867,15 +867,6 @@ func (s *SegmentBase) loadDvReaders() error { s.incrementBytesRead(read) - dataLoc, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) - if n <= 0 { - return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+ - "offset for sectionID %v field %v", secID, s.fieldsInv[fieldID]) - } - if secID == SectionInvertedTextIndex { - s.dictLocs = append(s.dictLocs, dataLoc) - s.incrementBytesRead(uint64(n)) - } fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd) if err != nil { return err diff --git a/vendor/github.com/blevesearch/zapx/v16/write.go b/vendor/github.com/blevesearch/zapx/v16/write.go index 1906a9bdbd..7b2c99e1ba 100644 --- a/vendor/github.com/blevesearch/zapx/v16/write.go +++ b/vendor/github.com/blevesearch/zapx/v16/write.go @@ -50,7 +50,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, return tw, nil } -func persistFieldsSection(fieldsInv []string, w *CountHashWriter, dictLocs []uint64, opaque map[int]resetable) (uint64, error) { +func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { var rv uint64 fieldsOffsets := make([]uint64, 0, len(fieldsInv)) diff --git a/vendor/modules.txt b/vendor/modules.txt index 143ddb130c..a15e8b0fa2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -160,7 +160,7 @@ github.com/bitly/go-simplejson # github.com/bits-and-blooms/bitset v1.12.0 ## explicit; go 1.16 github.com/bits-and-blooms/bitset -# github.com/blevesearch/bleve/v2 v2.4.2 +# github.com/blevesearch/bleve/v2 v2.4.3 ## explicit; go 1.21 github.com/blevesearch/bleve/v2 github.com/blevesearch/bleve/v2/analysis @@ -202,15 +202,15 @@ github.com/blevesearch/bleve/v2/search/scorer github.com/blevesearch/bleve/v2/search/searcher github.com/blevesearch/bleve/v2/size github.com/blevesearch/bleve/v2/util -# github.com/blevesearch/bleve_index_api v1.1.10 +# github.com/blevesearch/bleve_index_api v1.1.12 ## explicit; go 1.20 github.com/blevesearch/bleve_index_api # github.com/blevesearch/geo v0.1.20 ## explicit; go 1.18 github.com/blevesearch/geo/geojson github.com/blevesearch/geo/s2 -# github.com/blevesearch/go-faiss v1.0.20 -## explicit; go 1.19 +# github.com/blevesearch/go-faiss v1.0.23 +## explicit; go 1.21 github.com/blevesearch/go-faiss # github.com/blevesearch/go-porterstemmer v1.0.3 ## explicit; go 1.13 @@ -221,8 +221,8 @@ github.com/blevesearch/gtreap # github.com/blevesearch/mmap-go v1.0.4 ## explicit; go 1.13 github.com/blevesearch/mmap-go -# github.com/blevesearch/scorch_segment_api/v2 v2.2.15 -## explicit; go 1.20 +# github.com/blevesearch/scorch_segment_api/v2 v2.2.16 +## explicit; go 1.21 github.com/blevesearch/scorch_segment_api/v2 # github.com/blevesearch/segment v0.9.1 ## explicit; go 1.18 @@ -252,11 +252,11 @@ github.com/blevesearch/zapx/v13 # github.com/blevesearch/zapx/v14 v14.3.10 ## explicit; go 1.19 github.com/blevesearch/zapx/v14 -# github.com/blevesearch/zapx/v15 v15.3.13 +# github.com/blevesearch/zapx/v15 v15.3.16 ## explicit; go 1.19 github.com/blevesearch/zapx/v15 -# github.com/blevesearch/zapx/v16 v16.1.5 -## explicit; go 1.20 +# github.com/blevesearch/zapx/v16 v16.1.8 +## explicit; go 1.21 github.com/blevesearch/zapx/v16 # github.com/bluele/gcache v0.0.2 ## explicit; go 1.15