chore(deps): bump github.com/blevesearch/bleve/v2 from 2.4.2 to 2.4.3

Bumps [github.com/blevesearch/bleve/v2](https://github.com/blevesearch/bleve) from 2.4.2 to 2.4.3.
- [Release notes](https://github.com/blevesearch/bleve/releases)
- [Commits](https://github.com/blevesearch/bleve/compare/v2.4.2...v2.4.3)

---
updated-dependencies:
- dependency-name: github.com/blevesearch/bleve/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-11-14 06:35:46 +00:00
committed by Ralf Haferkamp
parent 926bc8e22a
commit 1104846219
51 changed files with 1321 additions and 269 deletions

12
go.mod
View File

@@ -12,7 +12,7 @@ require (
github.com/Nerzal/gocloak/v13 v13.9.0
github.com/bbalet/stopwords v1.0.0
github.com/beevik/etree v1.4.1
github.com/blevesearch/bleve/v2 v2.4.2
github.com/blevesearch/bleve/v2 v2.4.3
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.11.0
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1
@@ -135,13 +135,13 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/blevesearch/bleve_index_api v1.1.10 // indirect
github.com/blevesearch/bleve_index_api v1.1.12 // indirect
github.com/blevesearch/geo v0.1.20 // indirect
github.com/blevesearch/go-faiss v1.0.20 // indirect
github.com/blevesearch/go-faiss v1.0.23 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3 // indirect
github.com/blevesearch/gtreap v0.1.1 // indirect
github.com/blevesearch/mmap-go v1.0.4 // indirect
github.com/blevesearch/scorch_segment_api/v2 v2.2.15 // indirect
github.com/blevesearch/scorch_segment_api/v2 v2.2.16 // indirect
github.com/blevesearch/segment v0.9.1 // indirect
github.com/blevesearch/snowballstem v0.9.0 // indirect
github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect
@@ -150,8 +150,8 @@ require (
github.com/blevesearch/zapx/v12 v12.3.10 // indirect
github.com/blevesearch/zapx/v13 v13.3.10 // indirect
github.com/blevesearch/zapx/v14 v14.3.10 // indirect
github.com/blevesearch/zapx/v15 v15.3.13 // indirect
github.com/blevesearch/zapx/v16 v16.1.5 // indirect
github.com/blevesearch/zapx/v15 v15.3.16 // indirect
github.com/blevesearch/zapx/v16 v16.1.8 // indirect
github.com/bluele/gcache v0.0.2 // indirect
github.com/bombsimon/logrusr/v3 v3.1.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect

24
go.sum
View File

@@ -149,22 +149,22 @@ github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngE
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blevesearch/bleve/v2 v2.4.2 h1:NooYP1mb3c0StkiY9/xviiq2LGSaE8BQBCc/pirMx0U=
github.com/blevesearch/bleve/v2 v2.4.2/go.mod h1:ATNKj7Yl2oJv/lGuF4kx39bST2dveX6w0th2FFYLkc8=
github.com/blevesearch/bleve_index_api v1.1.10 h1:PDLFhVjrjQWr6jCuU7TwlmByQVCSEURADHdCqVS9+g0=
github.com/blevesearch/bleve_index_api v1.1.10/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8=
github.com/blevesearch/bleve/v2 v2.4.3 h1:XDYj+1prgX84L2Cf+V3ojrOPqXxy0qxyd2uLMmeuD+4=
github.com/blevesearch/bleve/v2 v2.4.3/go.mod h1:hEPDPrbYw3vyrm5VOa36GyS4bHWuIf4Fflp7460QQXY=
github.com/blevesearch/bleve_index_api v1.1.12 h1:P4bw9/G/5rulOF7SJ9l4FsDoo7UFJ+5kexNy1RXfegY=
github.com/blevesearch/bleve_index_api v1.1.12/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8=
github.com/blevesearch/geo v0.1.20 h1:paaSpu2Ewh/tn5DKn/FB5SzvH0EWupxHEIwbCk/QPqM=
github.com/blevesearch/geo v0.1.20/go.mod h1:DVG2QjwHNMFmjo+ZgzrIq2sfCh6rIHzy9d9d0B59I6w=
github.com/blevesearch/go-faiss v1.0.20 h1:AIkdTQFWuZ5LQmKQSebgMR4RynGNw8ZseJXaan5kvtI=
github.com/blevesearch/go-faiss v1.0.20/go.mod h1:jrxHrbl42X/RnDPI+wBoZU8joxxuRwedrxqswQ3xfU8=
github.com/blevesearch/go-faiss v1.0.23 h1:Wmc5AFwDLKGl2L6mjLX1Da3vCL0EKa2uHHSorcIS1Uc=
github.com/blevesearch/go-faiss v1.0.23/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk=
github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo=
github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M=
github.com/blevesearch/gtreap v0.1.1 h1:2JWigFrzDMR+42WGIN/V2p0cUvn4UP3C4Q5nmaZGW8Y=
github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgYICSZ3w0tYk=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/scorch_segment_api/v2 v2.2.15 h1:prV17iU/o+A8FiZi9MXmqbagd8I0bCqM7OKUYPbnb5Y=
github.com/blevesearch/scorch_segment_api/v2 v2.2.15/go.mod h1:db0cmP03bPNadXrCDuVkKLV6ywFSiRgPFT1YVrestBc=
github.com/blevesearch/scorch_segment_api/v2 v2.2.16 h1:uGvKVvG7zvSxCwcm4/ehBa9cCEuZVE+/zvrSl57QUVY=
github.com/blevesearch/scorch_segment_api/v2 v2.2.16/go.mod h1:VF5oHVbIFTu+znY1v30GjSpT5+9YFs9dV2hjvuh34F0=
github.com/blevesearch/segment v0.9.1 h1:+dThDy+Lvgj5JMxhmOVlgFfkUtZV2kw49xax4+jTfSU=
github.com/blevesearch/segment v0.9.1/go.mod h1:zN21iLm7+GnBHWTao9I+Au/7MBiL8pPFtJBJTsk6kQw=
github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s=
@@ -181,10 +181,10 @@ github.com/blevesearch/zapx/v13 v13.3.10 h1:0KY9tuxg06rXxOZHg3DwPJBjniSlqEgVpxIq
github.com/blevesearch/zapx/v13 v13.3.10/go.mod h1:w2wjSDQ/WBVeEIvP0fvMJZAzDwqwIEzVPnCPrz93yAk=
github.com/blevesearch/zapx/v14 v14.3.10 h1:SG6xlsL+W6YjhX5N3aEiL/2tcWh3DO75Bnz77pSwwKU=
github.com/blevesearch/zapx/v14 v14.3.10/go.mod h1:qqyuR0u230jN1yMmE4FIAuCxmahRQEOehF78m6oTgns=
github.com/blevesearch/zapx/v15 v15.3.13 h1:6EkfaZiPlAxqXz0neniq35my6S48QI94W/wyhnpDHHQ=
github.com/blevesearch/zapx/v15 v15.3.13/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg=
github.com/blevesearch/zapx/v16 v16.1.5 h1:b0sMcarqNFxuXvjoXsF8WtwVahnxyhEvBSRJi/AUHjU=
github.com/blevesearch/zapx/v16 v16.1.5/go.mod h1:J4mSF39w1QELc11EWRSBFkPeZuO7r/NPKkHzDCoiaI8=
github.com/blevesearch/zapx/v15 v15.3.16 h1:Ct3rv7FUJPfPk99TI/OofdC+Kpb4IdyfdMH48sb+FmE=
github.com/blevesearch/zapx/v15 v15.3.16/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg=
github.com/blevesearch/zapx/v16 v16.1.8 h1:Bxzpw6YQpFs7UjoCV1+RvDw6fmAT2GZxldwX8b3wVBM=
github.com/blevesearch/zapx/v16 v16.1.8/go.mod h1:JqQlOqlRVaYDkpLIl3JnKql8u4zKTNlVEa3nLsi0Gn8=
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=

View File

@@ -9,28 +9,29 @@
[![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
A modern indexing library in GO
A modern indexing + search library in GO
## Features
* Index any go data structure (including JSON)
* Intelligent defaults backed up by powerful configuration
* Index any GO data structure or JSON
* Intelligent defaults backed up by powerful configuration ([scorch](https://github.com/blevesearch/bleve/blob/master/index/scorch/README.md))
* Supported field types:
* `text`, `number`, `datetime`, `boolean`, `geopoint`, `geoshape`, `IP`, `vector`
* Supported query types:
* Term, Phrase, Match, Match Phrase, Prefix, Fuzzy
* Conjunction, Disjunction, Boolean (`must`/`should`/`must_not`)
* Term Range, Numeric Range, Date Range
* [Geo Spatial](https://github.com/blevesearch/bleve/blob/master/geo/README.md)
* Simple [query string syntax](http://www.blevesearch.com/docs/Query-String-Query/)
* Approximate k-nearest neighbors over [vectors](https://github.com/blevesearch/bleve/blob/master/docs/vectors.md)
* [tf-idf](https://en.wikipedia.org/wiki/Tf-idf) Scoring
* `term`, `phrase`, `match`, `match_phrase`, `prefix`, `regexp`, `wildcard`, `fuzzy`
* term range, numeric range, date range, boolean field
* compound queries: `conjuncts`, `disjuncts`, boolean (`must`/`should`/`must_not`)
* [query string syntax](http://www.blevesearch.com/docs/Query-String-Query/)
* [geo spatial search](https://github.com/blevesearch/bleve/blob/master/geo/README.md)
* approximate k-nearest neighbors via [vector search](https://github.com/blevesearch/bleve/blob/master/docs/vectors.md)
* [tf-idf](https://en.wikipedia.org/wiki/Tf-idf) scoring
* Hybrid search: exact + semantic
* Query time boosting
* Search result match highlighting with document fragments
* Aggregations/faceting support:
* Terms Facet
* Numeric Range Facet
* Date Range Facet
* terms facet
* numeric range facet
* date range facet
## Indexing

View File

@@ -46,6 +46,9 @@ func (b *Batch) Index(id string, data interface{}) error {
if id == "" {
return ErrorEmptyID
}
if eventIndex, ok := b.index.(index.EventIndex); ok {
eventIndex.FireIndexEvent()
}
doc := document.NewDocument(id)
err := b.index.Mapping().MapDocument(doc, data)
if err != nil {

View File

@@ -67,3 +67,7 @@ var EventKindMergeTaskIntroduction = EventKind(8)
// EventKindPreMergeCheck is fired before the merge begins to check if
// the caller should proceed with the merge.
var EventKindPreMergeCheck = EventKind(9)
// EventKindIndexStart is fired when Index() is invoked which
// creates a new Document object from an interface using the index mapping.
var EventKindIndexStart = EventKind(10)

View File

@@ -23,6 +23,7 @@ import (
"sync"
"sync/atomic"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/search"
index "github.com/blevesearch/bleve_index_api"
segment_api "github.com/blevesearch/scorch_segment_api/v2"
@@ -34,6 +35,8 @@ type OptimizeVR struct {
totalCost uint64
// maps field to vector readers
vrs map[string][]*IndexSnapshotVectorReader
// if at least one of the vector readers requires filtered kNN.
requiresFiltering bool
}
// This setting _MUST_ only be changed during init and not after.
@@ -62,6 +65,11 @@ func (o *OptimizeVR) Finish() error {
var errorsM sync.Mutex
var errors []error
var snapshotGlobalDocNums map[int]*roaring.Bitmap
if o.requiresFiltering {
snapshotGlobalDocNums = o.snapshot.globalDocNums()
}
defer o.invokeSearcherEndCallback()
wg := sync.WaitGroup{}
@@ -77,7 +85,8 @@ func (o *OptimizeVR) Finish() error {
wg.Done()
}()
for field, vrs := range o.vrs {
vecIndex, err := segment.InterpretVectorIndex(field, origSeg.deleted)
vecIndex, err := segment.InterpretVectorIndex(field,
o.requiresFiltering, origSeg.deleted)
if err != nil {
errorsM.Lock()
errors = append(errors, err)
@@ -89,9 +98,37 @@ func (o *OptimizeVR) Finish() error {
vectorIndexSize := vecIndex.Size()
origSeg.cachedMeta.updateMeta(field, vectorIndexSize)
for _, vr := range vrs {
var pl segment_api.VecPostingsList
var err error
// for each VR, populate postings list and iterators
// by passing the obtained vector index and getting similar vectors.
pl, err := vecIndex.Search(vr.vector, vr.k, vr.searchParams)
// Only applies to filtered kNN.
if vr.eligibleDocIDs != nil && len(vr.eligibleDocIDs) > 0 {
eligibleVectorInternalIDs := vr.getEligibleDocIDs()
if snapshotGlobalDocNums != nil {
// Only the eligible documents belonging to this segment
// will get filtered out.
// There is no way to determine which doc belongs to which segment
eligibleVectorInternalIDs.And(snapshotGlobalDocNums[index])
}
eligibleLocalDocNums := make([]uint64,
eligibleVectorInternalIDs.GetCardinality())
// get the (segment-)local document numbers
for i, docNum := range eligibleVectorInternalIDs.ToArray() {
localDocNum := o.snapshot.localDocNumFromGlobal(index,
uint64(docNum))
eligibleLocalDocNums[i] = localDocNum
}
pl, err = vecIndex.SearchWithFilter(vr.vector, vr.k,
eligibleLocalDocNums, vr.searchParams)
} else {
pl, err = vecIndex.Search(vr.vector, vr.k, vr.searchParams)
}
if err != nil {
errorsM.Lock()
errors = append(errors, err)
@@ -140,6 +177,9 @@ func (s *IndexSnapshotVectorReader) VectorOptimize(ctx context.Context,
return octx, nil
}
o.ctx = ctx
if !o.requiresFiltering {
o.requiresFiltering = len(s.eligibleDocIDs) > 0
}
if o.snapshot != s.snapshot {
o.invokeSearcherEndCallback()

View File

@@ -49,7 +49,7 @@ type Scorch struct {
unsafeBatch bool
rootLock sync.RWMutex
rootLock sync.RWMutex
root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
@@ -376,6 +376,8 @@ func (s *Scorch) Delete(id string) error {
func (s *Scorch) Batch(batch *index.Batch) (err error) {
start := time.Now()
// notify handlers that we're about to index a batch of data
s.fireEvent(EventKindBatchIntroductionStart, 0)
defer func() {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()
@@ -434,9 +436,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
indexStart := time.Now()
// notify handlers that we're about to introduce a segment
s.fireEvent(EventKindBatchIntroductionStart, 0)
var newSegment segment.Segment
var bufBytes uint64
stats := newFieldStats()
@@ -878,3 +877,8 @@ func (s *Scorch) CopyReader() index.CopyReader {
s.rootLock.Unlock()
return rv
}
// external API to fire a scorch event (EventKindIndexStart) externally from bleve
func (s *Scorch) FireIndexEvent() {
s.fireEvent(EventKindIndexStart, 0)
}

View File

@@ -471,16 +471,44 @@ func (is *IndexSnapshot) Document(id string) (rv index.Document, err error) {
return rvd, nil
}
// In a multi-segment index, each document has:
// 1. a local docnum - local to the segment
// 2. a global docnum - unique identifier across the index
// This function returns the segment index(the segment in which the docnum is present)
// and local docnum of a document.
func (is *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int, uint64) {
segmentIndex := sort.Search(len(is.offsets),
func(x int) bool {
return is.offsets[x] > docNum
}) - 1
localDocNum := docNum - is.offsets[segmentIndex]
localDocNum := is.localDocNumFromGlobal(segmentIndex, docNum)
return int(segmentIndex), localDocNum
}
// This function returns the local docnum, given the segment index and global docnum
func (is *IndexSnapshot) localDocNumFromGlobal(segmentIndex int, docNum uint64) uint64 {
return docNum - is.offsets[segmentIndex]
}
// Function to return a mapping of the segment index to the live global doc nums
// in the segment of the specified index snapshot.
func (is *IndexSnapshot) globalDocNums() map[int]*roaring.Bitmap {
if len(is.segment) == 0 {
return nil
}
segmentIndexGlobalDocNums := make(map[int]*roaring.Bitmap)
for i := range is.segment {
segmentIndexGlobalDocNums[i] = roaring.NewBitmap()
for _, localDocNum := range is.segment[i].DocNumbersLive().ToArray() {
segmentIndexGlobalDocNums[i].Add(localDocNum + uint32(is.offsets[i]))
}
}
return segmentIndexGlobalDocNums
}
func (is *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) {
docNum, err := docInternalToNumber(id)
if err != nil {

View File

@@ -24,6 +24,7 @@ import (
"fmt"
"reflect"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/size"
index "github.com/blevesearch/bleve_index_api"
segment_api "github.com/blevesearch/scorch_segment_api/v2"
@@ -51,6 +52,31 @@ type IndexSnapshotVectorReader struct {
ctx context.Context
searchParams json.RawMessage
// The following fields are only applicable for vector readers which will
// process pre-filtered kNN queries.
eligibleDocIDs []index.IndexInternalID
}
// Function to convert the internal IDs of the eligible documents to a type suitable
// for addition to a bitmap.
// Useful to have the eligible doc IDs in a bitmap to leverage the fast intersection
// (AND) operations. Eg. finding the eligible doc IDs present in a segment.
func (i *IndexSnapshotVectorReader) getEligibleDocIDs() *roaring.Bitmap {
res := roaring.NewBitmap()
if len(i.eligibleDocIDs) > 0 {
internalDocIDs := make([]uint32, 0, len(i.eligibleDocIDs))
// converts the doc IDs to uint32 and returns
for _, eligibleDocInternalID := range i.eligibleDocIDs {
internalDocID, err := docInternalToNumber(index.IndexInternalID(eligibleDocInternalID))
if err != nil {
continue
}
internalDocIDs = append(internalDocIDs, uint32(internalDocID))
}
res.AddMany(internalDocIDs)
}
return res
}
func (i *IndexSnapshotVectorReader) Size() int {
@@ -108,7 +134,17 @@ func (i *IndexSnapshotVectorReader) Advance(ID index.IndexInternalID,
preAlloced *index.VectorDoc) (*index.VectorDoc, error) {
if i.currPosting != nil && bytes.Compare(i.currID, ID) >= 0 {
i2, err := i.snapshot.VectorReader(i.ctx, i.vector, i.field, i.k, i.searchParams)
var i2 index.VectorReader
var err error
if len(i.eligibleDocIDs) > 0 {
i2, err = i.snapshot.VectorReaderWithFilter(i.ctx, i.vector, i.field,
i.k, i.searchParams, i.eligibleDocIDs)
} else {
i2, err = i.snapshot.VectorReader(i.ctx, i.vector, i.field, i.k,
i.searchParams)
}
if err != nil {
return nil, err
}

View File

@@ -48,3 +48,29 @@ func (is *IndexSnapshot) VectorReader(ctx context.Context, vector []float32,
return rv, nil
}
func (is *IndexSnapshot) VectorReaderWithFilter(ctx context.Context, vector []float32,
field string, k int64, searchParams json.RawMessage,
filterIDs []index.IndexInternalID) (
index.VectorReader, error) {
rv := &IndexSnapshotVectorReader{
vector: vector,
field: field,
k: k,
snapshot: is,
searchParams: searchParams,
eligibleDocIDs: filterIDs,
}
if rv.postings == nil {
rv.postings = make([]segment_api.VecPostingsList, len(is.segment))
}
if rv.iterators == nil {
rv.iterators = make([]segment_api.VecPostingsIterator, len(is.segment))
}
// initialize postings and iterators within the OptimizeVR's Finish()
return rv, nil
}

View File

@@ -256,6 +256,8 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) {
return ErrorIndexClosed
}
i.FireIndexEvent()
doc := document.NewDocument(id)
err = i.m.MapDocument(doc, data)
if err != nil {
@@ -1112,3 +1114,16 @@ func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser,
return os.OpenFile(filepath.Join(string(f), dir, file),
os.O_RDWR|os.O_CREATE, 0600)
}
func (i *indexImpl) FireIndexEvent() {
// get the internal index implementation
internalIndex, err := i.Advanced()
if err != nil {
return
}
// check if the internal index implementation supports events
if internalEventIndex, ok := internalIndex.(index.EventIndex); ok {
// fire the Index() event
internalEventIndex.FireIndexEvent()
}
}

View File

@@ -24,6 +24,7 @@ import (
"github.com/blevesearch/bleve/v2/document"
"github.com/blevesearch/bleve/v2/util"
index "github.com/blevesearch/bleve_index_api"
faiss "github.com/blevesearch/go-faiss"
)
// Min and Max allowed dimensions for a vector field;
@@ -140,6 +141,10 @@ func (fm *FieldMapping) processVector(propertyMightBeVector interface{},
if !ok {
return false
}
// normalize raw vector if similarity is cosine
if fm.Similarity == index.CosineSimilarity {
vector = NormalizeVector(vector)
}
fieldName := getFieldName(pathString, path, fm)
options := fm.Options()
@@ -163,6 +168,10 @@ func (fm *FieldMapping) processVectorBase64(propertyMightBeVectorBase64 interfac
if err != nil || len(decodedVector) != fm.Dims {
return
}
// normalize raw vector if similarity is cosine
if fm.Similarity == index.CosineSimilarity {
decodedVector = NormalizeVector(decodedVector)
}
fieldName := getFieldName(pathString, path, fm)
options := fm.Options()
@@ -252,3 +261,12 @@ func validateVectorFieldAlias(field *FieldMapping, parentName string,
return nil
}
func NormalizeVector(vec []float32) []float32 {
// make a copy of the vector to avoid modifying the original
// vector in-place
vecCopy := make([]float32, len(vec))
copy(vecCopy, vec)
// normalize the vector copy using in-place normalization provided by faiss
return faiss.NormalizeVector(vecCopy)
}

View File

@@ -0,0 +1,157 @@
// Copyright (c) 2024 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"fmt"
"time"
"github.com/blevesearch/bleve/v2/search"
index "github.com/blevesearch/bleve_index_api"
)
type EligibleCollector struct {
size int
total uint64
took time.Duration
results search.DocumentMatchCollection
ids []index.IndexInternalID
}
func NewEligibleCollector(size int) *EligibleCollector {
return newEligibleCollector(size)
}
func newEligibleCollector(size int) *EligibleCollector {
// No sort order & skip always 0 since this is only to filter eligible docs.
ec := &EligibleCollector{size: size,
ids: make([]index.IndexInternalID, 0, size),
}
return ec
}
func makeEligibleDocumentMatchHandler(ctx *search.SearchContext) (search.DocumentMatchHandler, error) {
if ec, ok := ctx.Collector.(*EligibleCollector); ok {
return func(d *search.DocumentMatch) error {
if d == nil {
return nil
}
copyOfID := make([]byte, len(d.IndexInternalID))
copy(copyOfID, d.IndexInternalID)
ec.ids = append(ec.ids, copyOfID)
// recycle the DocumentMatch
ctx.DocumentMatchPool.Put(d)
return nil
}, nil
}
return nil, fmt.Errorf("eligiblity collector not available")
}
func (ec *EligibleCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
startTime := time.Now()
var err error
var next *search.DocumentMatch
backingSize := ec.size
if backingSize > PreAllocSizeSkipCap {
backingSize = PreAllocSizeSkipCap + 1
}
searchContext := &search.SearchContext{
DocumentMatchPool: search.NewDocumentMatchPool(backingSize+searcher.DocumentMatchPoolSize(), 0),
Collector: ec,
IndexReader: reader,
}
dmHandler, err := makeEligibleDocumentMatchHandler(searchContext)
if err != nil {
return err
}
select {
case <-ctx.Done():
search.RecordSearchCost(ctx, search.AbortM, 0)
return ctx.Err()
default:
next, err = searcher.Next(searchContext)
}
for err == nil && next != nil {
if ec.total%CheckDoneEvery == 0 {
select {
case <-ctx.Done():
search.RecordSearchCost(ctx, search.AbortM, 0)
return ctx.Err()
default:
}
}
ec.total++
err = dmHandler(next)
if err != nil {
break
}
next, err = searcher.Next(searchContext)
}
if err != nil {
return err
}
// help finalize/flush the results in case
// of custom document match handlers.
err = dmHandler(nil)
if err != nil {
return err
}
// compute search duration
ec.took = time.Since(startTime)
return nil
}
func (ec *EligibleCollector) Results() search.DocumentMatchCollection {
return nil
}
func (ec *EligibleCollector) IDs() []index.IndexInternalID {
return ec.ids
}
func (ec *EligibleCollector) Total() uint64 {
return ec.total
}
// No concept of scoring in the eligible collector.
func (ec *EligibleCollector) MaxScore() float64 {
return 0
}
func (ec *EligibleCollector) Took() time.Duration {
return ec.took
}
func (ec *EligibleCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
// facet unsupported for pre-filtering in KNN search
}
func (ec *EligibleCollector) FacetResults() search.FacetResults {
// facet unsupported for pre-filtering in KNN search
return nil
}

View File

@@ -14,7 +14,9 @@
package collector
import "github.com/blevesearch/bleve/v2/search"
import (
"github.com/blevesearch/bleve/v2/search"
)
type collectStoreSlice struct {
slice search.DocumentMatchCollection

View File

@@ -74,6 +74,9 @@ func (q *BooleanQuery) SetMinShould(minShould float64) {
}
func (q *BooleanQuery) AddMust(m ...Query) {
if m == nil {
return
}
if q.Must == nil {
tmp := NewConjunctionQuery([]Query{})
tmp.queryStringMode = q.queryStringMode
@@ -85,6 +88,9 @@ func (q *BooleanQuery) AddMust(m ...Query) {
}
func (q *BooleanQuery) AddShould(m ...Query) {
if m == nil {
return
}
if q.Should == nil {
tmp := NewDisjunctionQuery([]Query{})
tmp.queryStringMode = q.queryStringMode
@@ -96,6 +102,9 @@ func (q *BooleanQuery) AddShould(m ...Query) {
}
func (q *BooleanQuery) AddMustNot(m ...Query) {
if m == nil {
return
}
if q.MustNot == nil {
tmp := NewDisjunctionQuery([]Query{})
tmp.queryStringMode = q.queryStringMode

View File

@@ -35,7 +35,9 @@ type KNNQuery struct {
BoostVal *Boost `json:"boost,omitempty"`
// see KNNRequest.Params for description
Params json.RawMessage `json:"params"`
Params json.RawMessage `json:"params"`
FilterQuery Query `json:"filter,omitempty"`
filterResults []index.IndexInternalID
}
func NewKNNQuery(vector []float32) *KNNQuery {
@@ -67,6 +69,14 @@ func (q *KNNQuery) SetParams(params json.RawMessage) {
q.Params = params
}
func (q *KNNQuery) SetFilterQuery(f Query) {
q.FilterQuery = f
}
func (q *KNNQuery) SetFilterResults(results []index.IndexInternalID) {
q.filterResults = results
}
func (q *KNNQuery) Searcher(ctx context.Context, i index.IndexReader,
m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
fieldMapping := m.FieldMappingForPath(q.VectorField)
@@ -77,6 +87,12 @@ func (q *KNNQuery) Searcher(ctx context.Context, i index.IndexReader,
if q.K <= 0 || len(q.Vector) == 0 {
return nil, fmt.Errorf("k must be greater than 0 and vector must be non-empty")
}
if similarityMetric == index.CosineSimilarity {
// normalize the vector
q.Vector = mapping.NormalizeVector(q.Vector)
}
return searcher.NewKNNSearcher(ctx, i, m, options, q.VectorField,
q.Vector, q.K, q.BoostVal.Value(), similarityMetric, q.Params)
q.Vector, q.K, q.BoostVal.Value(), similarityMetric, q.Params,
q.filterResults)
}

View File

@@ -49,11 +49,20 @@ type KNNSearcher struct {
func NewKNNSearcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping,
options search.SearcherOptions, field string, vector []float32, k int64,
boost float64, similarityMetric string, searchParams json.RawMessage) (
boost float64, similarityMetric string, searchParams json.RawMessage,
filterIDs []index.IndexInternalID) (
search.Searcher, error) {
if vr, ok := i.(index.VectorIndexReader); ok {
vectorReader, err := vr.VectorReader(ctx, vector, field, k, searchParams)
var vectorReader index.VectorReader
var err error
if len(filterIDs) > 0 {
vectorReader, err = vr.VectorReaderWithFilter(ctx, vector, field, k,
searchParams, filterIDs)
} else {
vectorReader, err = vr.VectorReader(ctx, vector, field, k, searchParams)
}
if err != nil {
return nil, err
}

View File

@@ -87,6 +87,10 @@ type KNNRequest struct {
//
// Consult go-faiss to know all supported search params
Params json.RawMessage `json:"params"`
// Filter query to use with kNN pre-filtering.
// Supports pre-filtering with all existing types of query clauses.
FilterQuery query.Query `json:"filter,omitempty"`
}
func (r *SearchRequest) AddKNN(field string, vector []float32, k int64, boost float64) {
@@ -99,6 +103,18 @@ func (r *SearchRequest) AddKNN(field string, vector []float32, k int64, boost fl
})
}
func (r *SearchRequest) AddKNNWithFilter(field string, vector []float32, k int64,
boost float64, filterQuery query.Query) {
b := query.Boost(boost)
r.KNN = append(r.KNN, &KNNRequest{
Field: field,
Vector: vector,
K: k,
Boost: &b,
FilterQuery: filterQuery,
})
}
func (r *SearchRequest) AddKNNOperator(operator knnOperator) {
r.KNNOperator = operator
}
@@ -106,6 +122,16 @@ func (r *SearchRequest) AddKNNOperator(operator knnOperator) {
// UnmarshalJSON deserializes a JSON representation of
// a SearchRequest
func (r *SearchRequest) UnmarshalJSON(input []byte) error {
type tempKNNReq struct {
Field string `json:"field"`
Vector []float32 `json:"vector"`
VectorBase64 string `json:"vector_base64"`
K int64 `json:"k"`
Boost *query.Boost `json:"boost,omitempty"`
Params json.RawMessage `json:"params"`
FilterQuery json.RawMessage `json:"filter,omitempty"`
}
var temp struct {
Q json.RawMessage `json:"query"`
Size *int `json:"size"`
@@ -119,7 +145,7 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error {
Score string `json:"score"`
SearchAfter []string `json:"search_after"`
SearchBefore []string `json:"search_before"`
KNN []*KNNRequest `json:"knn"`
KNN []*tempKNNReq `json:"knn"`
KNNOperator knnOperator `json:"knn_operator"`
PreSearchData json.RawMessage `json:"pre_search_data"`
}
@@ -163,7 +189,22 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error {
r.From = 0
}
r.KNN = temp.KNN
r.KNN = make([]*KNNRequest, len(temp.KNN))
for i, knnReq := range temp.KNN {
r.KNN[i] = &KNNRequest{}
r.KNN[i].Field = temp.KNN[i].Field
r.KNN[i].Vector = temp.KNN[i].Vector
r.KNN[i].VectorBase64 = temp.KNN[i].VectorBase64
r.KNN[i].K = temp.KNN[i].K
r.KNN[i].Boost = temp.KNN[i].Boost
r.KNN[i].Params = temp.KNN[i].Params
if len(knnReq.FilterQuery) == 0 {
// Setting this to nil to avoid ParseQuery() setting it to a match none
r.KNN[i].FilterQuery = nil
} else {
r.KNN[i].FilterQuery, err = query.ParseQuery(knnReq.FilterQuery)
}
}
r.KNNOperator = temp.KNNOperator
if r.KNNOperator == "" {
r.KNNOperator = knnOperatorOr
@@ -209,7 +250,9 @@ var (
knnOperatorOr = knnOperator("or")
)
func createKNNQuery(req *SearchRequest) (query.Query, []int64, int64, error) {
func createKNNQuery(req *SearchRequest, eligibleDocsMap map[int][]index.IndexInternalID,
requiresFiltering map[int]bool) (
query.Query, []int64, int64, error) {
if requestHasKNN(req) {
// first perform validation
err := validateKNN(req)
@@ -219,12 +262,25 @@ func createKNNQuery(req *SearchRequest) (query.Query, []int64, int64, error) {
var subQueries []query.Query
kArray := make([]int64, 0, len(req.KNN))
sumOfK := int64(0)
for _, knn := range req.KNN {
for i, knn := range req.KNN {
// If it's a filtered kNN but has no eligible filter hits, then
// do not run the kNN query.
if requiresFiltering[i] && len(eligibleDocsMap[i]) <= 0 {
continue
}
knnQuery := query.NewKNNQuery(knn.Vector)
knnQuery.SetFieldVal(knn.Field)
knnQuery.SetK(knn.K)
knnQuery.SetBoost(knn.Boost.Value())
knnQuery.SetParams(knn.Params)
if len(eligibleDocsMap[i]) > 0 {
knnQuery.SetFilterQuery(knn.FilterQuery)
filterResults, exists := eligibleDocsMap[i]
if exists {
knnQuery.SetFilterResults(filterResults)
}
}
subQueries = append(subQueries, knnQuery)
kArray = append(kArray, knn.K)
sumOfK += knn.K
@@ -303,7 +359,62 @@ func addSortAndFieldsToKNNHits(req *SearchRequest, knnHits []*search.DocumentMat
}
func (i *indexImpl) runKnnCollector(ctx context.Context, req *SearchRequest, reader index.IndexReader, preSearch bool) ([]*search.DocumentMatch, error) {
KNNQuery, kArray, sumOfK, err := createKNNQuery(req)
// maps the index of the KNN query in the req to the pre-filter hits aka
// eligible docs' internal IDs .
filterHitsMap := make(map[int][]index.IndexInternalID)
// Indicates if this query requires filtering downstream
// No filtering required if it's a match all query/no filters applied.
requiresFiltering := make(map[int]bool)
for idx, knnReq := range req.KNN {
// TODO Can use goroutines for this filter query stuff - do it if perf results
// show this to be significantly slow otherwise.
filterQ := knnReq.FilterQuery
if filterQ == nil {
requiresFiltering[idx] = false
continue
}
if _, ok := filterQ.(*query.MatchAllQuery); ok {
// Equivalent to not having a filter query.
requiresFiltering[idx] = false
continue
}
if _, ok := filterQ.(*query.MatchNoneQuery); ok {
// Filtering required since no hits are eligible.
requiresFiltering[idx] = true
// a match none query just means none the documents are eligible
// hence, we can save on running the query.
continue
}
// Applies to all supported types of queries.
filterSearcher, _ := filterQ.Searcher(ctx, reader, i.m, search.SearcherOptions{
Score: "none", // just want eligible hits --> don't compute scores if not needed
})
// Using the index doc count to determine collector size since we do not
// have an estimate of the number of eligible docs in the index yet.
indexDocCount, err := i.DocCount()
if err != nil {
return nil, err
}
filterColl := collector.NewEligibleCollector(int(indexDocCount))
err = filterColl.Collect(ctx, filterSearcher, reader)
if err != nil {
return nil, err
}
filterHits := filterColl.IDs()
if len(filterHits) > 0 {
filterHitsMap[idx] = filterHits
}
// set requiresFiltering regardless of whether there're filtered hits or
// not to later decide whether to consider the knnQuery or not
requiresFiltering[idx] = true
}
// Add the filter hits when creating the kNN query
KNNQuery, kArray, sumOfK, err := createKNNQuery(req, filterHitsMap, requiresFiltering)
if err != nil {
return nil, err
}

View File

@@ -57,6 +57,14 @@ type CopyIndex interface {
CopyReader() CopyReader
}
// EventIndex is an optional interface for exposing the support for firing event
// callbacks for various events in the index.
type EventIndex interface {
// FireIndexEvent is used to fire an event callback when Index() is called,
// to notify the caller that a document has been added to the index.
FireIndexEvent()
}
type IndexReader interface {
TermFieldReader(ctx context.Context, term []byte, field string, includeFreq, includeNorm, includeTermVectors bool) (TermFieldReader, error)

View File

@@ -32,12 +32,9 @@ type VectorField interface {
const (
EuclideanDistance = "l2_norm"
// dotProduct(vecA, vecB) = vecA . vecB = |vecA| * |vecB| * cos(theta);
// where, theta is the angle between vecA and vecB
// If vecA and vecB are normalized (unit magnitude), then
// vecA . vecB = cos(theta), which is the cosine similarity.
// Thus, we don't need a separate similarity type for cosine similarity
CosineSimilarity = "dot_product"
InnerProduct = "dot_product"
CosineSimilarity = "cosine"
)
const DefaultSimilarityMetric = EuclideanDistance
@@ -45,6 +42,7 @@ const DefaultSimilarityMetric = EuclideanDistance
// Supported similarity metrics for vector fields
var SupportedSimilarityMetrics = map[string]struct{}{
EuclideanDistance: {},
InnerProduct: {},
CosineSimilarity: {},
}

View File

@@ -48,8 +48,11 @@ type VectorReader interface {
}
type VectorIndexReader interface {
VectorReader(ctx context.Context, vector []float32, field string, k int64, searchParams json.RawMessage) (
VectorReader, error)
VectorReader(ctx context.Context, vector []float32, field string, k int64,
searchParams json.RawMessage) (VectorReader, error)
VectorReaderWithFilter(ctx context.Context, vector []float32, field string, k int64,
searchParams json.RawMessage, filterIDs []IndexInternalID) (VectorReader, error)
}
type VectorDoc struct {

View File

@@ -9,6 +9,7 @@ package faiss
#include <faiss/c_api/Index_c.h>
#include <faiss/c_api/error_c.h>
#include <faiss/c_api/utils/distances_c.h>
*/
import "C"
import "errors"
@@ -28,3 +29,13 @@ const (
MetricBrayCurtis = C.METRIC_BrayCurtis
MetricJensenShannon = C.METRIC_JensenShannon
)
// In-place normalization of provided vector (single)
func NormalizeVector(vector []float32) []float32 {
C.faiss_fvec_renorm_L2(
C.size_t(len(vector)),
1, // number of vectors
(*C.float)(&vector[0]))
return vector
}

View File

@@ -44,6 +44,15 @@ type Index interface {
// AddWithIDs is like Add, but stores xids instead of sequential IDs.
AddWithIDs(x []float32, xids []int64) error
// Applicable only to IVF indexes: Return a map of centroid ID --> []vector IDs
// for the cluster.
ObtainClusterToVecIDsFromIVFIndex() (ids map[int64][]int64, err error)
// Applicable only to IVF indexes: Returns the centroid IDs in decreasing order
// of proximity to query 'x' and their distance from 'x'
ObtainClustersWithDistancesFromIVFIndex(x []float32, centroidIDs []int64) (
[]int64, []float32, error)
// Search queries the index with the vectors in x.
// Returns the IDs of the k nearest neighbors for each query vector and the
// corresponding distances.
@@ -52,6 +61,14 @@ type Index interface {
SearchWithoutIDs(x []float32, k int64, exclude []int64, params json.RawMessage) (distances []float32,
labels []int64, err error)
SearchWithIDs(x []float32, k int64, include []int64, params json.RawMessage) (distances []float32,
labels []int64, err error)
// Applicable only to IVF indexes: Search clusters whose IDs are in eligibleCentroidIDs
SearchClustersFromIVFIndex(selector Selector, nvecs int, eligibleCentroidIDs []int64,
minEligibleCentroids int, k int64, x, centroidDis []float32,
params json.RawMessage) ([]float32, []int64, error)
Reconstruct(key int64) ([]float32, error)
ReconstructBatch(keys []int64, recons []float32) ([]float32, error)
@@ -123,6 +140,101 @@ func (idx *faissIndex) Add(x []float32) error {
return nil
}
func (idx *faissIndex) ObtainClusterToVecIDsFromIVFIndex() (map[int64][]int64, error) {
// This type assertion is required to determine whether to invoke
// ObtainClustersWithDistancesFromIVFIndex, SearchClustersFromIVFIndex or not.
if ivfIdx := C.faiss_IndexIVF_cast(idx.cPtr()); ivfIdx == nil {
return nil, nil
}
clusterVectorIDMap := make(map[int64][]int64)
nlist := C.faiss_IndexIVF_nlist(idx.idx)
for i := 0; i < int(nlist); i++ {
list_size := C.faiss_IndexIVF_get_list_size(idx.idx, C.size_t(i))
invlist := make([]int64, list_size)
C.faiss_IndexIVF_invlists_get_ids(idx.idx, C.size_t(i), (*C.idx_t)(&invlist[0]))
clusterVectorIDMap[int64(i)] = invlist
}
return clusterVectorIDMap, nil
}
func (idx *faissIndex) ObtainClustersWithDistancesFromIVFIndex(x []float32, centroidIDs []int64) (
[]int64, []float32, error) {
// Selector to include only the centroids whose IDs are part of 'centroidIDs'.
includeSelector, err := NewIDSelectorBatch(centroidIDs)
if err != nil {
return nil, nil, err
}
defer includeSelector.Delete()
params, err := NewSearchParams(idx, json.RawMessage{}, includeSelector.Get())
if err != nil {
return nil, nil, err
}
// Populate these with the centroids and their distances.
centroids := make([]int64, len(centroidIDs))
centroidDistances := make([]float32, len(centroidIDs))
n := len(x) / idx.D()
c := C.faiss_Search_closest_eligible_centroids(idx.idx, (C.int)(n),
(*C.float)(&x[0]), (C.int)(len(centroidIDs)),
(*C.float)(&centroidDistances[0]), (*C.idx_t)(&centroids[0]), params.sp)
if c != 0 {
return nil, nil, getLastError()
}
return centroids, centroidDistances, nil
}
func (idx *faissIndex) SearchClustersFromIVFIndex(selector Selector, nvecs int,
eligibleCentroidIDs []int64, minEligibleCentroids int, k int64, x,
centroidDis []float32, params json.RawMessage) ([]float32, []int64, error) {
defer selector.Delete()
tempParams := defaultSearchParamsIVF{
Nlist: len(eligibleCentroidIDs),
// Have to override nprobe so that more clusters will be searched for this
// query, if required.
Nprobe: minEligibleCentroids,
Nvecs: nvecs,
}
searchParams, err := NewSearchParamsIVF(idx, params, selector.Get(),
tempParams)
if err != nil {
return nil, nil, err
}
n := len(x) / idx.D()
distances := make([]float32, int64(n)*k)
labels := make([]int64, int64(n)*k)
effectiveNprobe := getNProbeFromSearchParams(searchParams)
eligibleCentroidIDs = eligibleCentroidIDs[:effectiveNprobe]
centroidDis = centroidDis[:effectiveNprobe]
if c := C.faiss_IndexIVF_search_preassigned_with_params(
idx.idx,
(C.idx_t)(n),
(*C.float)(&x[0]),
(C.idx_t)(k),
(*C.idx_t)(&eligibleCentroidIDs[0]),
(*C.float)(&centroidDis[0]),
(*C.float)(&distances[0]),
(*C.idx_t)(&labels[0]),
(C.int)(0),
searchParams.sp); c != 0 {
return nil, nil, getLastError()
}
return distances, labels, nil
}
func (idx *faissIndex) AddWithIDs(x []float32, xids []int64) error {
n := len(x) / idx.D()
if c := C.faiss_Index_add_with_ids(
@@ -139,7 +251,6 @@ func (idx *faissIndex) AddWithIDs(x []float32, xids []int64) error {
func (idx *faissIndex) Search(x []float32, k int64) (
distances []float32, labels []int64, err error,
) {
n := len(x) / idx.D()
distances = make([]float32, int64(n)*k)
labels = make([]int64, int64(n)*k)
@@ -170,7 +281,7 @@ func (idx *faissIndex) SearchWithoutIDs(x []float32, k int64, exclude []int64, p
if err != nil {
return nil, nil, err
}
selector = excludeSelector.sel
selector = excludeSelector.Get()
defer excludeSelector.Delete()
}
@@ -185,6 +296,25 @@ func (idx *faissIndex) SearchWithoutIDs(x []float32, k int64, exclude []int64, p
return
}
func (idx *faissIndex) SearchWithIDs(x []float32, k int64, include []int64,
params json.RawMessage) (distances []float32, labels []int64, err error,
) {
includeSelector, err := NewIDSelectorBatch(include)
if err != nil {
return nil, nil, err
}
defer includeSelector.Delete()
searchParams, err := NewSearchParams(idx, params, includeSelector.Get())
if err != nil {
return nil, nil, err
}
defer searchParams.Delete()
distances, labels, err = idx.searchWithParams(x, k, searchParams.sp)
return
}
func (idx *faissIndex) Reconstruct(key int64) (recons []float32, err error) {
rv := make([]float32, idx.D())
if c := C.faiss_Index_reconstruct(

View File

@@ -51,3 +51,11 @@ func (idx *IndexImpl) SetNProbe(nprobe int32) {
}
C.faiss_IndexIVF_set_nprobe(ivfPtr, C.size_t(nprobe))
}
func (idx *IndexImpl) GetNProbe() int32 {
ivfPtr := C.faiss_IndexIVF_cast(idx.cPtr())
if ivfPtr == nil {
return 0
}
return int32(C.faiss_IndexIVF_nprobe(ivfPtr))
}

View File

@@ -28,6 +28,15 @@ type searchParamsIVF struct {
MaxCodesPct float32 `json:"ivf_max_codes_pct,omitempty"`
}
// IVF Parameters used to override the index-time defaults for a specific query.
// Serve as the 'new' defaults for this query, unless overridden by search-time
// params.
type defaultSearchParamsIVF struct {
Nprobe int `json:"ivf_nprobe,omitempty"`
Nlist int `json:"ivf_nlist,omitempty"`
Nvecs int `json:"ivf_nvecs,omitempty"`
}
func (s *searchParamsIVF) Validate() error {
if s.NprobePct < 0 || s.NprobePct > 100 {
return fmt.Errorf("invalid IVF search params, ivf_nprobe_pct:%v, "+
@@ -42,6 +51,70 @@ func (s *searchParamsIVF) Validate() error {
return nil
}
func getNProbeFromSearchParams(params *SearchParams) int32 {
return int32(C.faiss_SearchParametersIVF_nprobe(params.sp))
}
func NewSearchParamsIVF(idx Index, params json.RawMessage, sel *C.FaissIDSelector,
defaultParams defaultSearchParamsIVF) (*SearchParams, error) {
rv := &SearchParams{}
if ivfIdx := C.faiss_IndexIVF_cast(idx.cPtr()); ivfIdx != nil {
rv.sp = C.faiss_SearchParametersIVF_cast(rv.sp)
if len(params) == 0 && sel == nil {
return rv, nil
}
var nprobe, maxCodes, nlist int
nlist = int(C.faiss_IndexIVF_nlist(ivfIdx))
// It's important to set nprobe to the value decided at the time of
// index creation. Otherwise, nprobe will be set to the default
// value of 1.
nprobe = int(C.faiss_IndexIVF_nprobe(ivfIdx))
nvecs := idx.Ntotal()
if defaultParams.Nvecs > 0 {
nvecs = int64(defaultParams.Nvecs)
}
if defaultParams.Nlist > 0 {
nlist = defaultParams.Nlist
}
if defaultParams.Nprobe > 0 {
nprobe = defaultParams.Nprobe
}
var ivfParams searchParamsIVF
if len(params) > 0 {
if err := json.Unmarshal(params, &ivfParams); err != nil {
return rv, fmt.Errorf("failed to unmarshal IVF search params, "+
"err:%v", err)
}
if err := ivfParams.Validate(); err != nil {
return rv, err
}
}
if ivfParams.NprobePct > 0 {
// in the situation when the calculated nprobe happens to be
// between 0 and 1, we'll round it up.
nprobe = max(int(float32(nlist)*(ivfParams.NprobePct/100)), 1)
}
if ivfParams.MaxCodesPct > 0 {
maxCodes = int(float32(nvecs) * (ivfParams.MaxCodesPct / 100))
} // else, maxCodes will be set to the default value of 0, which means no limit
if c := C.faiss_SearchParametersIVF_new_with(
&rv.sp,
sel,
C.size_t(nprobe),
C.size_t(maxCodes),
); c != 0 {
return rv, fmt.Errorf("failed to create faiss IVF search params")
}
}
return rv, nil
}
// Always return a valid SearchParams object,
// thus caller must clean up the object
// by invoking Delete() method, even if an error is returned.
@@ -52,29 +125,33 @@ func NewSearchParams(idx Index, params json.RawMessage, sel *C.FaissIDSelector,
return rv, fmt.Errorf("failed to create faiss search params")
}
// # check if the index is IVF and set the search params
// check if the index is IVF and set the search params
if ivfIdx := C.faiss_IndexIVF_cast(idx.cPtr()); ivfIdx != nil {
rv.sp = C.faiss_SearchParametersIVF_cast(rv.sp)
if len(params) == 0 {
if len(params) == 0 && sel == nil {
return rv, nil
}
var ivfParams searchParamsIVF
if err := json.Unmarshal(params, &ivfParams); err != nil {
return rv, fmt.Errorf("failed to unmarshal IVF search params, "+
"err:%v", err)
}
if err := ivfParams.Validate(); err != nil {
return rv, err
if len(params) > 0 {
if err := json.Unmarshal(params, &ivfParams); err != nil {
return rv, fmt.Errorf("failed to unmarshal IVF search params, "+
"err:%v", err)
}
if err := ivfParams.Validate(); err != nil {
return rv, err
}
}
var nprobe, maxCodes int
if ivfParams.NprobePct > 0 {
nlist := float32(C.faiss_IndexIVF_nlist(ivfIdx))
nprobe = int(nlist * (ivfParams.NprobePct / 100))
// in the situation when the calculated nprobe happens to be
// between 0 and 1, we'll round it up.
nprobe = max(int(nlist*(ivfParams.NprobePct/100)), 1)
} else {
// It's important to set nprobe to the value decided at the time of
// it's important to set nprobe to the value decided at the time of
// index creation. Otherwise, nprobe will be set to the default
// value of 1.
nprobe = int(C.faiss_IndexIVF_nprobe(ivfIdx))

View File

@@ -5,6 +5,11 @@ package faiss
*/
import "C"
type Selector interface {
Get() *C.FaissIDSelector
Delete()
}
// IDSelector represents a set of IDs to remove.
type IDSelector struct {
sel *C.FaissIDSelector
@@ -19,13 +24,17 @@ func (s *IDSelector) Delete() {
C.faiss_IDSelector_free(s.sel)
}
type IDSelectorBatch struct {
func (s *IDSelector) Get() *C.FaissIDSelector {
return s.sel
}
type IDSelectorNot struct {
sel *C.FaissIDSelector
batchSel *C.FaissIDSelector
}
// Delete frees the memory associated with s.
func (s *IDSelectorBatch) Delete() {
func (s *IDSelectorNot) Delete() {
if s == nil {
return
}
@@ -38,8 +47,12 @@ func (s *IDSelectorBatch) Delete() {
}
}
func (s *IDSelectorNot) Get() *C.FaissIDSelector {
return s.sel
}
// NewIDSelectorRange creates a selector that removes IDs on [imin, imax).
func NewIDSelectorRange(imin, imax int64) (*IDSelector, error) {
func NewIDSelectorRange(imin, imax int64) (Selector, error) {
var sel *C.FaissIDSelectorRange
c := C.faiss_IDSelectorRange_new(&sel, C.idx_t(imin), C.idx_t(imax))
if c != 0 {
@@ -49,7 +62,7 @@ func NewIDSelectorRange(imin, imax int64) (*IDSelector, error) {
}
// NewIDSelectorBatch creates a new batch selector.
func NewIDSelectorBatch(indices []int64) (*IDSelector, error) {
func NewIDSelectorBatch(indices []int64) (Selector, error) {
var sel *C.FaissIDSelectorBatch
if c := C.faiss_IDSelectorBatch_new(
&sel,
@@ -61,9 +74,9 @@ func NewIDSelectorBatch(indices []int64) (*IDSelector, error) {
return &IDSelector{(*C.FaissIDSelector)(sel)}, nil
}
// NewIDSelectorNot creates a new Not selector, wrapped arround a
// NewIDSelectorNot creates a new Not selector, wrapped around a
// batch selector, with the IDs in 'exclude'.
func NewIDSelectorNot(exclude []int64) (*IDSelectorBatch, error) {
func NewIDSelectorNot(exclude []int64) (Selector, error) {
batchSelector, err := NewIDSelectorBatch(exclude)
if err != nil {
return nil, err
@@ -72,10 +85,11 @@ func NewIDSelectorNot(exclude []int64) (*IDSelectorBatch, error) {
var sel *C.FaissIDSelectorNot
if c := C.faiss_IDSelectorNot_new(
&sel,
batchSelector.sel,
batchSelector.Get(),
); c != 0 {
batchSelector.Delete()
return nil, getLastError()
}
return &IDSelectorBatch{sel: (*C.FaissIDSelector)(sel), batchSel: batchSelector.sel}, nil
return &IDSelectorNot{sel: (*C.FaissIDSelector)(sel),
batchSel: batchSelector.Get()}, nil
}

View File

@@ -59,13 +59,17 @@ type VecPostingsIterator interface {
type VectorIndex interface {
// @params: Search params for backing vector index (like IVF, HNSW, etc.)
Search(qVector []float32, k int64, params json.RawMessage) (VecPostingsList, error)
// @eligibleDocIDs: DocIDs in the segment eligible for the kNN query.
SearchWithFilter(qVector []float32, k int64, eligibleDocIDs []uint64,
params json.RawMessage) (VecPostingsList, error)
Close()
Size() uint64
}
type VectorSegment interface {
Segment
InterpretVectorIndex(field string, except *roaring.Bitmap) (VectorIndex, error)
InterpretVectorIndex(field string, requiresFiltering bool, except *roaring.Bitmap) (
VectorIndex, error)
}
type VecPosting interface {

View File

@@ -15,6 +15,7 @@
package zap
import (
"errors"
"fmt"
)
@@ -26,8 +27,18 @@ var LegacyChunkMode uint32 = 1024
// be used by default.
var DefaultChunkMode uint32 = 1026
var ErrChunkSizeZero = errors.New("chunk size is zero")
// getChunkSize returns the chunk size for the given chunkMode, cardinality, and
// maxDocs.
//
// In error cases, the returned chunk size will be 0. Caller can differentiate
// between a valid chunk size of 0 and an error by checking for ErrChunkSizeZero.
func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, error) {
switch {
case chunkMode == 0:
return 0, ErrChunkSizeZero
// any chunkMode <= 1024 will always chunk with chunkSize=chunkMode
case chunkMode <= 1024:
// legacy chunk size
@@ -46,6 +57,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64,
// chunk-size items.
// no attempt is made to tweak any other case
if cardinality <= 1024 {
if maxDocs == 0 {
return 0, ErrChunkSizeZero
}
return maxDocs, nil
}
return 1024, nil
@@ -61,6 +75,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64,
// 2. convert to chunkSize, dividing into maxDocs
numChunks := (cardinality / 1024) + 1
chunkSize := maxDocs / numChunks
if chunkSize == 0 {
return 0, ErrChunkSizeZero
}
return chunkSize, nil
}
return 0, fmt.Errorf("unknown chunk mode %d", chunkMode)

View File

@@ -19,7 +19,6 @@ import (
"encoding/binary"
"io"
"reflect"
"sync/atomic"
"github.com/golang/snappy"
)
@@ -37,7 +36,7 @@ var (
)
type chunkedContentCoder struct {
bytesWritten uint64 // atomic access to this variable, moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS.
bytesWritten uint64 // moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS.
final []byte
chunkSize uint64
@@ -112,11 +111,11 @@ func (c *chunkedContentCoder) Close() error {
}
func (c *chunkedContentCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
c.bytesWritten += val
}
func (c *chunkedContentCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
return c.bytesWritten
}
func (c *chunkedContentCoder) flushContents() error {

View File

@@ -75,7 +75,6 @@ type docValueReader struct {
curChunkData []byte // compressed data cache
uncompressed []byte // temp buf for snappy decompression
// atomic access to this variable
bytesRead uint64
}

View File

@@ -27,7 +27,6 @@ type chunkedIntDecoder struct {
data []byte
r *memUvarintReader
// atomic access to this variable
bytesRead uint64
}

View File

@@ -18,7 +18,6 @@ import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"
)
// We can safely use 0 to represent termNotEncoded since 0
@@ -36,7 +35,6 @@ type chunkedIntCoder struct {
buf []byte
// atomic access to this variable
bytesWritten uint64
}
@@ -79,11 +77,11 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
}
func (c *chunkedIntCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
c.bytesWritten += val
}
func (c *chunkedIntCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
return c.bytesWritten
}
// Add encodes the provided integers into the correct chunk for the provided

View File

@@ -591,6 +591,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo
use1HitEncoding func(uint64) (bool, uint64, uint64),
w *CountHashWriter, bufMaxVarintLen64 []byte) (
offset uint64, err error) {
if postings == nil {
return 0, nil
}
termCardinality := postings.GetCardinality()
if termCardinality <= 0 {
return 0, nil

View File

@@ -147,7 +147,6 @@ type interim struct {
lastNumDocs int
lastOutSize int
// atomic access to this variable
bytesWritten uint64
}
@@ -497,11 +496,11 @@ func (s *interim) processDocument(docNum uint64,
}
func (s *interim) getBytesWritten() uint64 {
return atomic.LoadUint64(&s.bytesWritten)
return s.bytesWritten
}
func (s *interim) incrementBytesWritten(val uint64) {
atomic.AddUint64(&s.bytesWritten, val)
s.bytesWritten += val
}
func (s *interim) writeStoredFields() (
@@ -667,7 +666,11 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
locs := s.Locs[pid]
locOffset := 0
chunkSize, err := getChunkSize(s.chunkMode, postingsBS.GetCardinality(), uint64(len(s.results)))
var cardinality uint64
if postingsBS != nil {
cardinality = postingsBS.GetCardinality()
}
chunkSize, err := getChunkSize(s.chunkMode, cardinality, uint64(len(s.results)))
if err != nil {
return 0, nil, err
}

View File

@@ -305,10 +305,7 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
chunkSize, err := getChunkSize(d.sb.chunkMode,
rv.postings.GetCardinality(), d.sb.numDocs)
if err != nil {
return err
} else if chunkSize == 0 {
return fmt.Errorf("chunk size is zero, chunkMode: %v, numDocs: %v",
d.sb.chunkMode, d.sb.numDocs)
return fmt.Errorf("failed to get chunk size: %v", err)
}
rv.chunkSize = chunkSize
@@ -632,6 +629,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfter(atOrAfter uint64) (uint64, bool,
return i.nextDocNumAtOrAfterClean(atOrAfter)
}
if i.postings.chunkSize == 0 {
return 0, false, ErrChunkSizeZero
}
i.Actual.AdvanceIfNeeded(uint32(atOrAfter))
if !i.Actual.HasNext() || !i.all.HasNext() {
@@ -741,6 +742,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfterClean(
return uint64(i.Actual.Next()), true, nil
}
if i.postings != nil && i.postings.chunkSize == 0 {
return 0, false, ErrChunkSizeZero
}
// freq-norm's needed, so maintain freq-norm chunk reader
sameChunkNexts := 0 // # of times we called Next() in the same chunk
n := i.Actual.Next()

View File

@@ -157,24 +157,24 @@ func persistStoredFieldValues(fieldID int,
return curr, data, nil
}
func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
storedIndexOffset uint64, dictLocs []uint64,
sectionsIndexOffset uint64) (*SegmentBase, error) {
func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64,
storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) {
sb := &SegmentBase{
mem: mem,
memCRC: memCRC,
chunkMode: chunkMode,
fieldsMap: fieldsMap,
numDocs: numDocs,
storedIndexOffset: storedIndexOffset,
fieldsIndexOffset: sectionsIndexOffset,
sectionsIndexOffset: sectionsIndexOffset,
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
docValueOffset: 0, // docValueOffsets identified automatically by the section
dictLocs: dictLocs,
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
// following fields gets populated by loadFieldsNew
fieldsMap: make(map[string]uint16),
dictLocs: make([]uint64, 0),
fieldsInv: make([]string, 0),
}
sb.updateSize()

View File

@@ -15,6 +15,7 @@
package zap
import (
"errors"
"fmt"
)
@@ -26,8 +27,18 @@ var LegacyChunkMode uint32 = 1024
// be used by default.
var DefaultChunkMode uint32 = 1026
var ErrChunkSizeZero = errors.New("chunk size is zero")
// getChunkSize returns the chunk size for the given chunkMode, cardinality, and
// maxDocs.
//
// In error cases, the returned chunk size will be 0. Caller can differentiate
// between a valid chunk size of 0 and an error by checking for ErrChunkSizeZero.
func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64, error) {
switch {
case chunkMode == 0:
return 0, ErrChunkSizeZero
// any chunkMode <= 1024 will always chunk with chunkSize=chunkMode
case chunkMode <= 1024:
// legacy chunk size
@@ -46,6 +57,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64,
// chunk-size items.
// no attempt is made to tweak any other case
if cardinality <= 1024 {
if maxDocs == 0 {
return 0, ErrChunkSizeZero
}
return maxDocs, nil
}
return 1024, nil
@@ -61,6 +75,9 @@ func getChunkSize(chunkMode uint32, cardinality uint64, maxDocs uint64) (uint64,
// 2. convert to chunkSize, dividing into maxDocs
numChunks := (cardinality / 1024) + 1
chunkSize := maxDocs / numChunks
if chunkSize == 0 {
return 0, ErrChunkSizeZero
}
return chunkSize, nil
}
return 0, fmt.Errorf("unknown chunk mode %d", chunkMode)

View File

@@ -19,7 +19,6 @@ import (
"encoding/binary"
"io"
"reflect"
"sync/atomic"
"github.com/golang/snappy"
)
@@ -37,7 +36,7 @@ var (
)
type chunkedContentCoder struct {
bytesWritten uint64 // atomic access to this variable, moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS.
bytesWritten uint64 // moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS.
final []byte
chunkSize uint64
@@ -112,11 +111,11 @@ func (c *chunkedContentCoder) Close() error {
}
func (c *chunkedContentCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
c.bytesWritten += val
}
func (c *chunkedContentCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
return c.bytesWritten
}
func (c *chunkedContentCoder) flushContents() error {

View File

@@ -75,7 +75,6 @@ type docValueReader struct {
curChunkData []byte // compressed data cache
uncompressed []byte // temp buf for snappy decompression
// atomic access to this variable
bytesRead uint64
}

View File

@@ -52,54 +52,97 @@ func (vc *vectorIndexCache) Clear() {
vc.m.Unlock()
}
func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) {
var found bool
index, vecDocIDMap, vecIDsToExclude, found = vc.loadFromCache(fieldID, except)
if !found {
index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except)
}
return index, vecDocIDMap, vecIDsToExclude, err
// loadDocVecIDMap indicates if a non-nil docVecIDMap should be returned.
// It is true when a filtered kNN query accesses the cache since it requires the
// map. It's false otherwise.
func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte,
loadDocVecIDMap bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64,
vecIDsToExclude []int64, err error) {
index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err = vc.loadFromCache(
fieldID, loadDocVecIDMap, mem, except)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err
}
func (vc *vectorIndexCache) loadFromCache(fieldID uint16, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, found bool) {
// function to load the vectorDocIDMap and if required, docVecIDMap from cache
// If not, it will create these and add them to the cache.
func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool,
mem []byte, except *roaring.Bitmap) (index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {
vc.m.RLock()
defer vc.m.RUnlock()
entry, ok := vc.cache[fieldID]
if !ok {
return nil, nil, nil, false
}
index, vecDocIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
return index, vecDocIDMap, vecIDsToExclude, true
}
func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) {
vc.m.Lock()
defer vc.m.Unlock()
// when there are multiple threads trying to build the index, guard redundant
// index creation by doing a double check and return if already created and
// cached.
entry, ok := vc.cache[fieldID]
if ok {
index, vecDocIDMap = entry.load()
index, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
return index, vecDocIDMap, vecIDsToExclude, nil
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
vc.m.RUnlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
vc.m.RUnlock()
vc.m.Lock()
// in cases where only the docVecID isn't part of the cache, build it and
// add it to the cache, while holding a lock to avoid concurrent modifications.
// typically seen for the first filtered query.
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
vc.m.Unlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
// if the cache doesn't have entry, construct the vector to doc id map and the
// vector index out of the mem bytes and update the cache under lock.
vc.m.RUnlock()
// acquiring a lock since this is modifying the cache.
vc.m.Lock()
defer vc.m.Unlock()
return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, except)
}
func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint32][]int64 {
// Handle concurrent accesses (to avoid unnecessary work) by adding a
// check within the write lock here.
if ce.docVecIDMap != nil {
return ce.docVecIDMap
}
docVecIDMap := make(map[uint32][]int64)
for vecID, docID := range ce.vecDocIDMap {
docVecIDMap[docID] = append(docVecIDMap[docID], vecID)
}
ce.docVecIDMap = docVecIDMap
return docVecIDMap
}
// Rebuilding the cache on a miss.
func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
loadDocVecIDMap bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {
// Handle concurrent accesses (to avoid unnecessary work) by adding a
// check within the write lock here.
entry := vc.cache[fieldID]
if entry != nil {
index, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
// if the cache doesn't have the entry, construct the vector to doc id map and
// the vector index out of the mem bytes and update the cache under lock.
pos := 0
numVecs, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64])
pos += n
vecDocIDMap = make(map[int64]uint32, numVecs)
if loadDocVecIDMap {
docVecIDMap = make(map[uint32][]int64, numVecs)
}
isExceptNotEmpty := except != nil && !except.IsEmpty()
for i := 0; i < int(numVecs); i++ {
vecID, n := binary.Varint(mem[pos : pos+binary.MaxVarintLen64])
@@ -113,6 +156,9 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r
continue
}
vecDocIDMap[vecID] = docIDUint32
if loadDocVecIDMap {
docVecIDMap[docIDUint32] = append(docVecIDMap[docIDUint32], vecID)
}
}
indexSize, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64])
@@ -120,15 +166,16 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r
index, err = faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
vc.insertLOCKED(fieldID, index, vecDocIDMap)
return index, vecDocIDMap, vecIDsToExclude, nil
vc.insertLOCKED(fieldID, index, vecDocIDMap, loadDocVecIDMap, docVecIDMap)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32) {
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, loadDocVecIDMap bool,
docVecIDMap map[uint32][]int64) {
// the first time we've hit the cache, try to spawn a monitoring routine
// which will reconcile the moving averages for all the fields being hit
if len(vc.cache) == 0 {
@@ -142,7 +189,8 @@ func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
// this makes the average to be kept above the threshold value for a
// longer time and thereby the index to be resident in the cache
// for longer time.
vc.cache[fieldIDPlus1] = createCacheEntry(index, vecDocIDMap, 0.4)
vc.cache[fieldIDPlus1] = createCacheEntry(index, vecDocIDMap,
loadDocVecIDMap, docVecIDMap, 0.4)
}
}
@@ -235,8 +283,9 @@ func (e *ewma) add(val uint64) {
// -----------------------------------------------------------------------------
func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, alpha float64) *cacheEntry {
return &cacheEntry{
func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
loadDocVecIDMap bool, docVecIDMap map[uint32][]int64, alpha float64) *cacheEntry {
ce := &cacheEntry{
index: index,
vecDocIDMap: vecDocIDMap,
tracker: &ewma{
@@ -245,6 +294,10 @@ func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, alph
},
refs: 1,
}
if loadDocVecIDMap {
ce.docVecIDMap = docVecIDMap
}
return ce
}
type cacheEntry struct {
@@ -257,6 +310,7 @@ type cacheEntry struct {
index *faiss.IndexImpl
vecDocIDMap map[int64]uint32
docVecIDMap map[uint32][]int64
}
func (ce *cacheEntry) incHit() {
@@ -271,10 +325,10 @@ func (ce *cacheEntry) decRef() {
atomic.AddInt64(&ce.refs, -1)
}
func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]uint32) {
func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]uint32, map[uint32][]int64) {
ce.incHit()
ce.addRef()
return ce.index, ce.vecDocIDMap
return ce.index, ce.vecDocIDMap, ce.docVecIDMap
}
func (ce *cacheEntry) close() {
@@ -282,6 +336,7 @@ func (ce *cacheEntry) close() {
ce.index.Close()
ce.index = nil
ce.vecDocIDMap = nil
ce.docVecIDMap = nil
}()
}

View File

@@ -134,12 +134,15 @@ func (p *VecPostingsList) Size() int {
}
func (p *VecPostingsList) Count() uint64 {
n := p.postings.GetCardinality()
var e uint64
if p.except != nil {
e = p.postings.AndCardinality(p.except)
if p.postings != nil {
n := p.postings.GetCardinality()
var e uint64
if p.except != nil {
e = p.postings.AndCardinality(p.except)
}
return n - e
}
return n - e
return 0
}
func (vpl *VecPostingsList) ResetBytesRead(val uint64) {
@@ -267,16 +270,26 @@ func (vpl *VecPostingsIterator) BytesWritten() uint64 {
// vectorIndexWrapper conforms to scorch_segment_api's VectorIndex interface
type vectorIndexWrapper struct {
search func(qVector []float32, k int64, params json.RawMessage) (segment.VecPostingsList, error)
close func()
size func() uint64
search func(qVector []float32, k int64,
params json.RawMessage) (segment.VecPostingsList, error)
searchWithFilter func(qVector []float32, k int64, eligibleDocIDs []uint64,
params json.RawMessage) (segment.VecPostingsList, error)
close func()
size func() uint64
}
func (i *vectorIndexWrapper) Search(qVector []float32, k int64, params json.RawMessage) (
func (i *vectorIndexWrapper) Search(qVector []float32, k int64,
params json.RawMessage) (
segment.VecPostingsList, error) {
return i.search(qVector, k, params)
}
func (i *vectorIndexWrapper) SearchWithFilter(qVector []float32, k int64,
eligibleDocIDs []uint64, params json.RawMessage) (
segment.VecPostingsList, error) {
return i.searchWithFilter(qVector, k, eligibleDocIDs, params)
}
func (i *vectorIndexWrapper) Close() {
i.close()
}
@@ -288,20 +301,40 @@ func (i *vectorIndexWrapper) Size() uint64 {
// InterpretVectorIndex returns a construct of closures (vectorIndexWrapper)
// that will allow the caller to -
// (1) search within an attached vector index
// (2) close attached vector index
// (3) get the size of the attached vector index
func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap) (
// (2) search limited to a subset of documents within an attached vector index
// (3) close attached vector index
// (4) get the size of the attached vector index
func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool,
except *roaring.Bitmap) (
segment.VectorIndex, error) {
// Params needed for the closures
var vecIndex *faiss.IndexImpl
var vecDocIDMap map[int64]uint32
var docVecIDMap map[uint32][]int64
var vectorIDsToExclude []int64
var fieldIDPlus1 uint16
var vecIndexSize uint64
// Utility function to add the corresponding docID and scores for each vector
// returned after the kNN query to the newly
// created vecPostingsList
addIDsToPostingsList := func(pl *VecPostingsList, ids []int64, scores []float32) {
for i := 0; i < len(ids); i++ {
vecID := ids[i]
// Checking if it's present in the vecDocIDMap.
// If -1 is returned as an ID(insufficient vectors), this will ensure
// it isn't added to the final postings list.
if docID, ok := vecDocIDMap[vecID]; ok {
code := getVectorCode(docID, scores[i])
pl.postings.Add(uint64(code))
}
}
}
var (
wrapVecIndex = &vectorIndexWrapper{
search: func(qVector []float32, k int64, params json.RawMessage) (segment.VecPostingsList, error) {
search: func(qVector []float32, k int64, params json.RawMessage) (
segment.VecPostingsList, error) {
// 1. returned postings list (of type PostingsList) has two types of information - docNum and its score.
// 2. both the values can be represented using roaring bitmaps.
// 3. the Iterator (of type PostingsIterator) returned would operate in terms of VecPostings.
@@ -318,23 +351,204 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap
return rv, nil
}
scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k, vectorIDsToExclude, params)
scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k,
vectorIDsToExclude, params)
if err != nil {
return nil, err
}
// for every similar vector returned by the Search() API, add the corresponding
// docID and the score to the newly created vecPostingsList
for i := 0; i < len(ids); i++ {
vecID := ids[i]
// Checking if it's present in the vecDocIDMap.
// If -1 is returned as an ID(insufficient vectors), this will ensure
// it isn't added to the final postings list.
if docID, ok := vecDocIDMap[vecID]; ok {
code := getVectorCode(docID, scores[i])
rv.postings.Add(uint64(code))
}
addIDsToPostingsList(rv, ids, scores)
return rv, nil
},
searchWithFilter: func(qVector []float32, k int64,
eligibleDocIDs []uint64, params json.RawMessage) (
segment.VecPostingsList, error) {
// 1. returned postings list (of type PostingsList) has two types of information - docNum and its score.
// 2. both the values can be represented using roaring bitmaps.
// 3. the Iterator (of type PostingsIterator) returned would operate in terms of VecPostings.
// 4. VecPostings would just have the docNum and the score. Every call of Next()
// and Advance just returns the next VecPostings. The caller would do a vp.Number()
// and the Score() to get the corresponding values
rv := &VecPostingsList{
except: nil, // todo: handle the except bitmap within postings iterator.
postings: roaring64.New(),
}
if vecIndex == nil || vecIndex.D() != len(qVector) {
// vector index not found or dimensionality mismatched
return rv, nil
}
if len(eligibleDocIDs) > 0 {
// Non-zero documents eligible per the filter query.
// If every element in the index is eligible(eg. high selectivity
// cases), then this can basically be considered unfiltered kNN.
if len(eligibleDocIDs) == int(sb.numDocs) {
scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k,
vectorIDsToExclude, params)
if err != nil {
return nil, err
}
addIDsToPostingsList(rv, ids, scores)
return rv, nil
}
// vector IDs corresponding to the local doc numbers to be
// considered for the search
vectorIDsToInclude := make([]int64, 0, len(eligibleDocIDs))
for _, id := range eligibleDocIDs {
vectorIDsToInclude = append(vectorIDsToInclude, docVecIDMap[uint32(id)]...)
}
// Retrieve the mapping of centroid IDs to vectors within
// the cluster.
clusterAssignment, _ := vecIndex.ObtainClusterToVecIDsFromIVFIndex()
// Accounting for a flat index
if len(clusterAssignment) == 0 {
scores, ids, err := vecIndex.SearchWithIDs(qVector, k,
vectorIDsToInclude, params)
if err != nil {
return nil, err
}
addIDsToPostingsList(rv, ids, scores)
return rv, nil
}
// Converting to roaring bitmap for ease of intersect ops with
// the set of eligible doc IDs.
centroidVecIDMap := make(map[int64]*roaring.Bitmap)
for centroidID, vecIDs := range clusterAssignment {
if _, exists := centroidVecIDMap[centroidID]; !exists {
centroidVecIDMap[centroidID] = roaring.NewBitmap()
}
vecIDsUint32 := make([]uint32, 0, len(vecIDs))
for _, vecID := range vecIDs {
vecIDsUint32 = append(vecIDsUint32, uint32(vecID))
}
centroidVecIDMap[centroidID].AddMany(vecIDsUint32)
}
// Determining which clusters, identified by centroid ID,
// have at least one eligible vector and hence, ought to be
// probed.
eligibleCentroidIDs := make([]int64, 0)
var selector faiss.Selector
var err error
// If there are more elements to be included than excluded, it
// might be quicker to use an exclusion selector as a filter
// instead of an inclusion selector.
if float32(len(eligibleDocIDs))/float32(len(docVecIDMap)) > 0.5 {
ineligibleVecIDsBitmap := roaring.NewBitmap()
eligibleDocIDsMap := make(map[uint64]struct{})
for _, eligibleDocID := range eligibleDocIDs {
eligibleDocIDsMap[(eligibleDocID)] = struct{}{}
}
ineligibleVectorIDs := make([]int64, 0, len(vecDocIDMap)-
len(vectorIDsToInclude))
for docID, vecIDs := range docVecIDMap {
if _, exists := eligibleDocIDsMap[uint64(docID)]; !exists {
for _, vecID := range vecIDs {
ineligibleVecIDsBitmap.Add(uint32(vecID))
ineligibleVectorIDs = append(ineligibleVectorIDs, vecID)
}
}
}
for centroidID, vecIDs := range centroidVecIDMap {
vecIDs.AndNot(ineligibleVecIDsBitmap)
// At least one eligible vec in cluster.
if !vecIDs.IsEmpty() {
// The mapping is now reduced to those vectors which
// are also eligible docs for the filter query.
centroidVecIDMap[centroidID] = vecIDs
eligibleCentroidIDs = append(eligibleCentroidIDs, centroidID)
} else {
// don't consider clusters with no eligible IDs.
delete(centroidVecIDMap, centroidID)
}
}
selector, err = faiss.NewIDSelectorNot(ineligibleVectorIDs)
} else {
// Getting the vector IDs corresponding to the eligible
// doc IDs.
// The docVecIDMap maps each docID to vectorIDs corresponding
// to it.
// Usually, each docID has one vecID mapped to it unless
// the vector is nested, in which case there can be multiple
// vectorIDs mapped to the same docID.
// Eg. docID d1 -> vecID v1, for the first case
// d1 -> {v1,v2}, for the second case.
eligibleVecIDsBitmap := roaring.NewBitmap()
vecIDsUint32 := make([]uint32, 0)
for _, eligibleDocID := range eligibleDocIDs {
vecIDs := docVecIDMap[uint32(eligibleDocID)]
for _, vecID := range vecIDs {
vecIDsUint32 = append(vecIDsUint32, uint32(vecID))
}
}
eligibleVecIDsBitmap.AddMany(vecIDsUint32)
for centroidID, vecIDs := range centroidVecIDMap {
vecIDs.And(eligibleVecIDsBitmap)
if !vecIDs.IsEmpty() {
// The mapping is now reduced to those vectors which
// are also eligible docs for the filter query.
centroidVecIDMap[centroidID] = vecIDs
eligibleCentroidIDs = append(eligibleCentroidIDs, centroidID)
} else {
// don't consider clusters with no eligible IDs.
delete(centroidVecIDMap, centroidID)
}
}
selector, err = faiss.NewIDSelectorBatch(vectorIDsToInclude)
}
if err != nil {
return nil, err
}
// Ordering the retrieved centroid IDs by increasing order
// of distance i.e. decreasing order of proximity to query vector.
closestCentroidIDs, centroidDistances, _ :=
vecIndex.ObtainClustersWithDistancesFromIVFIndex(qVector,
eligibleCentroidIDs)
// Getting the nprobe value set at index time.
nprobe := vecIndex.GetNProbe()
eligibleDocsTillNow := int64(0)
minEligibleCentroids := 0
for i, centroidID := range closestCentroidIDs {
eligibleDocsTillNow += int64(centroidVecIDMap[centroidID].GetCardinality())
if eligibleDocsTillNow >= k && i >= int(nprobe-1) {
// Continue till at least 'K' cumulative vectors are
// collected or 'nprobe' clusters are examined, whichever
// comes later.
minEligibleCentroids = i + 1
break
}
minEligibleCentroids = i + 1
}
// Search the clusters specified by 'closestCentroidIDs' for
// vectors whose IDs are present in 'vectorIDsToInclude'
scores, ids, err := vecIndex.SearchClustersFromIVFIndex(
selector, len(vectorIDsToInclude), closestCentroidIDs,
minEligibleCentroids, k, qVector, centroidDistances, params)
if err != nil {
return nil, err
}
addIDsToPostingsList(rv, ids, scores)
return rv, nil
}
return rv, nil
},
close: func() {
@@ -372,8 +586,9 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap
pos += n
}
vecIndex, vecDocIDMap, vectorIDsToExclude, err =
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except)
vecIndex, vecDocIDMap, docVecIDMap, vectorIDsToExclude, err =
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], requiresFiltering,
except)
if vecIndex != nil {
vecIndexSize = vecIndex.Size()

View File

@@ -27,7 +27,6 @@ type chunkedIntDecoder struct {
data []byte
r *memUvarintReader
// atomic access to this variable
bytesRead uint64
}

View File

@@ -18,7 +18,6 @@ import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"
)
// We can safely use 0 to represent termNotEncoded since 0
@@ -36,7 +35,6 @@ type chunkedIntCoder struct {
buf []byte
// atomic access to this variable
bytesWritten uint64
}
@@ -79,11 +77,11 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
}
func (c *chunkedIntCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
c.bytesWritten += val
}
func (c *chunkedIntCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
return c.bytesWritten
}
// Add encodes the provided integers into the correct chunk for the provided

View File

@@ -73,8 +73,8 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
// wrap it for counting (tracking offsets)
cr := NewCountHashWriterWithStatsReporter(br, s)
newDocNums, numDocs, storedIndexOffset, _, _, _, sectionsIndexOffset, err :=
MergeToWriter(segmentBases, drops, chunkMode, cr, closeCh)
newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err :=
mergeToWriter(segmentBases, drops, chunkMode, cr, closeCh)
if err != nil {
cleanup()
return nil, 0, err
@@ -109,9 +109,9 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
return newDocNums, uint64(cr.Count()), nil
}
func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) (
newDocNums [][]uint64, numDocs, storedIndexOffset uint64, dictLocs []uint64,
newDocNums [][]uint64, numDocs, storedIndexOffset uint64,
fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64,
err error) {
@@ -122,7 +122,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
numDocs = computeNewDocCount(segments, drops)
if isClosed(closeCh) {
return nil, 0, 0, nil, nil, nil, 0, seg.ErrClosed
return nil, 0, 0, nil, nil, 0, seg.ErrClosed
}
// the merge opaque is especially important when it comes to tracking the file
@@ -140,7 +140,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}
// at this point, ask each section implementation to merge itself
@@ -149,21 +149,19 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}
}
} else {
dictLocs = make([]uint64, len(fieldsInv))
}
// we can persist the fields section index now, this will point
// to the various indexes (each in different section) available for a field.
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, dictLocs, mergeOpaque)
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}
return newDocNums, numDocs, storedIndexOffset, dictLocs, fieldsInv, fieldsMap, sectionsIndexOffset, nil
return newDocNums, numDocs, storedIndexOffset, fieldsInv, fieldsMap, sectionsIndexOffset, nil
}
// mapFields takes the fieldsInv list and returns a map of fieldName
@@ -306,6 +304,10 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo
use1HitEncoding func(uint64) (bool, uint64, uint64),
w *CountHashWriter, bufMaxVarintLen64 []byte) (
offset uint64, err error) {
if postings == nil {
return 0, nil
}
termCardinality := postings.GetCardinality()
if termCardinality <= 0 {
return 0, nil

View File

@@ -66,14 +66,13 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
s.chunkMode = chunkMode
s.w = NewCountHashWriter(&br)
storedIndexOffset, dictOffsets, sectionsIndexOffset, err := s.convert()
storedIndexOffset, sectionsIndexOffset, err := s.convert()
if err != nil {
return nil, uint64(0), err
}
sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
s.FieldsMap, s.FieldsInv, uint64(len(results)),
storedIndexOffset, dictOffsets, sectionsIndexOffset)
uint64(len(results)), storedIndexOffset, sectionsIndexOffset)
// get the bytes written before the interim's reset() call
// write it to the newly formed segment base.
@@ -125,8 +124,10 @@ func (s *interim) reset() (err error) {
s.results = nil
s.chunkMode = 0
s.w = nil
s.FieldsMap = nil
s.FieldsInv = nil
for k := range s.FieldsMap {
delete(s.FieldsMap, k)
}
s.FieldsInv = s.FieldsInv[:0]
s.metaBuf.Reset()
s.tmp0 = s.tmp0[:0]
s.tmp1 = s.tmp1[:0]
@@ -168,8 +169,10 @@ type interimLoc struct {
arrayposs []uint64
}
func (s *interim) convert() (uint64, []uint64, uint64, error) {
s.FieldsMap = map[string]uint16{}
func (s *interim) convert() (uint64, uint64, error) {
if s.FieldsMap == nil {
s.FieldsMap = map[string]uint16{}
}
args := map[string]interface{}{
"results": s.results,
@@ -209,17 +212,15 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {
storedIndexOffset, err := s.writeStoredFields()
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}
var dictOffsets []uint64
// we can persist the various sections at this point.
// the rule of thumb here is that each section must persist field wise.
for _, x := range segmentSections {
_, err = x.Persist(s.opaque, s.w)
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}
}
@@ -231,18 +232,14 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {
}
}
if len(s.results) == 0 {
dictOffsets = make([]uint64, len(s.FieldsInv))
}
// we can persist a new fields section here
// this new fields section will point to the various indexes available
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, dictOffsets, s.opaque)
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque)
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}
return storedIndexOffset, dictOffsets, sectionsIndexOffset, nil
return storedIndexOffset, sectionsIndexOffset, nil
}
func (s *interim) getOrDefineField(fieldName string) int {

View File

@@ -305,10 +305,7 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
chunkSize, err := getChunkSize(d.sb.chunkMode,
rv.postings.GetCardinality(), d.sb.numDocs)
if err != nil {
return err
} else if chunkSize == 0 {
return fmt.Errorf("chunk size is zero, chunkMode: %v, numDocs: %v",
d.sb.chunkMode, d.sb.numDocs)
return fmt.Errorf("failed to get chunk size: %v", err)
}
rv.chunkSize = chunkSize
@@ -632,6 +629,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfter(atOrAfter uint64) (uint64, bool,
return i.nextDocNumAtOrAfterClean(atOrAfter)
}
if i.postings.chunkSize == 0 {
return 0, false, ErrChunkSizeZero
}
i.Actual.AdvanceIfNeeded(uint32(atOrAfter))
if !i.Actual.HasNext() || !i.all.HasNext() {
@@ -741,6 +742,10 @@ func (i *PostingsIterator) nextDocNumAtOrAfterClean(
return uint64(i.Actual.Next()), true, nil
}
if i.postings != nil && i.postings.chunkSize == 0 {
return 0, false, ErrChunkSizeZero
}
// freq-norm's needed, so maintain freq-norm chunk reader
sameChunkNexts := 0 // # of times we called Next() in the same chunk
n := i.Actual.Next()

View File

@@ -68,12 +68,14 @@ func (v *faissVectorIndexSection) AddrForField(opaque map[int]resetable, fieldID
return vo.fieldAddrs[uint16(fieldID)]
}
// metadata corresponding to a serialized vector index
type vecIndexMeta struct {
// information specific to a vector index - (including metadata and
// the index pointer itself)
type vecIndexInfo struct {
startOffset int
indexSize uint64
vecIds []int64
indexOptimizedFor string
index *faiss.IndexImpl
}
// keep in mind with respect to update and delete operations with respect to vectors
@@ -87,7 +89,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
// in the segment this will help by avoiding multiple allocation
// calls.
vecSegs := make([]*SegmentBase, 0, len(segments))
indexes := make([]*vecIndexMeta, 0, len(segments))
indexes := make([]*vecIndexInfo, 0, len(segments))
for fieldID, fieldName := range fieldsInv {
indexes = indexes[:0] // resizing the slices
@@ -128,7 +130,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
pos += n
vecSegs = append(vecSegs, sb)
indexes = append(indexes, &vecIndexMeta{
indexes = append(indexes, &vecIndexInfo{
vecIds: make([]int64, 0, numVecs),
indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)],
})
@@ -182,7 +184,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
}
func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter,
vecToDocID map[int64]uint64, indexes []*vecIndexMeta) error {
vecToDocID map[int64]uint64, indexes []*vecIndexInfo) error {
tempBuf := v.grabBuf(binary.MaxVarintLen64)
// early exit if there are absolutely no valid vectors present in the segment
@@ -275,9 +277,14 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 {
// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
indexes []*vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {
vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error {
vecIndexes := make([]*faiss.IndexImpl, 0, len(sbs))
// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
var dims, metric int
var indexOptimizedFor string
var validMerge bool
var finalVecIDCap, indexDataCap, reconsCap int
for segI, segBase := range sbs {
// Considering merge operations on vector indexes are expensive, it is
@@ -287,26 +294,37 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
freeReconstructedIndexes(vecIndexes)
return seg.ErrClosed
}
if len(vecIndexes[segI].vecIds) == 0 {
// no valid vectors for this index, don't bring it into memory
continue
}
// read the index bytes. todo: parallelize this
indexBytes := segBase.mem[indexes[segI].startOffset : indexes[segI].startOffset+int(indexes[segI].indexSize)]
indexBytes := segBase.mem[vecIndexes[segI].startOffset : vecIndexes[segI].startOffset+int(vecIndexes[segI].indexSize)]
index, err := faiss.ReadIndexFromBuffer(indexBytes, faissIOFlags)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
if len(indexes[segI].vecIds) > 0 {
indexReconsLen := len(indexes[segI].vecIds) * index.D()
if len(vecIndexes[segI].vecIds) > 0 {
indexReconsLen := len(vecIndexes[segI].vecIds) * index.D()
if indexReconsLen > reconsCap {
reconsCap = indexReconsLen
}
indexDataCap += indexReconsLen
finalVecIDCap += len(indexes[segI].vecIds)
finalVecIDCap += len(vecIndexes[segI].vecIds)
}
vecIndexes = append(vecIndexes, index)
vecIndexes[segI].index = index
validMerge = true
// set the dims and metric values from the constructed index.
dims = index.D()
metric = int(index.MetricType())
indexOptimizedFor = vecIndexes[segI].indexOptimizedFor
}
// no vector indexes to merge
if len(vecIndexes) == 0 {
// not a valid merge operation as there are no valid indexes to merge.
if !validMerge {
return nil
}
@@ -326,18 +344,18 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
// reconstruct the vectors only if present, it could be that
// some of the indexes had all of their vectors updated/deleted.
if len(indexes[i].vecIds) > 0 {
neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D()
if len(vecIndexes[i].vecIds) > 0 {
neededReconsLen := len(vecIndexes[i].vecIds) * vecIndexes[i].index.D()
recons = recons[:neededReconsLen]
// todo: parallelize reconstruction
recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons)
recons, err = vecIndexes[i].index.ReconstructBatch(vecIndexes[i].vecIds, recons)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
indexData = append(indexData, recons...)
// Adding vector IDs in the same order as the vectors
finalVecIDs = append(finalVecIDs, indexes[i].vecIds...)
finalVecIDs = append(finalVecIDs, vecIndexes[i].vecIds...)
}
}
@@ -351,12 +369,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
nvecs := len(finalVecIDs)
// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
indexOptimizedFor := indexes[0].indexOptimizedFor
// index type to be created after merge based on the number of vectors
// in indexData added into the index.
nlist := determineCentroids(nvecs)
@@ -419,9 +431,11 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
}
// todo: can be parallelized.
func freeReconstructedIndexes(indexes []*faiss.IndexImpl) {
for _, index := range indexes {
index.Close()
func freeReconstructedIndexes(indexes []*vecIndexInfo) {
for _, entry := range indexes {
if entry.index != nil {
entry.index.Close()
}
}
}
@@ -494,8 +508,10 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint
ids = append(ids, hash)
}
// Set the faiss metric type (default is Euclidean Distance or l2_norm)
var metric = faiss.MetricL2
if content.metric == index.CosineSimilarity {
if content.metric == index.InnerProduct || content.metric == index.CosineSimilarity {
// use the same FAISS metric for inner product and cosine similarity
metric = faiss.MetricInnerProduct
}

View File

@@ -398,11 +398,11 @@ func (i *invertedIndexOpaque) grabBuf(size int) []byte {
}
func (i *invertedIndexOpaque) incrementBytesWritten(bytes uint64) {
atomic.AddUint64(&i.bytesWritten, bytes)
i.bytesWritten += bytes
}
func (i *invertedIndexOpaque) BytesWritten() uint64 {
return atomic.LoadUint64(&i.bytesWritten)
return i.bytesWritten
}
func (i *invertedIndexOpaque) BytesRead() uint64 {
@@ -412,7 +412,6 @@ func (i *invertedIndexOpaque) BytesRead() uint64 {
func (i *invertedIndexOpaque) ResetBytesRead(uint64) {}
func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uint64, err error) {
if io.results == nil || len(io.results) == 0 {
return nil, nil
}
@@ -462,7 +461,11 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin
locs := io.Locs[pid]
locOffset := 0
chunkSize, err := getChunkSize(io.chunkMode, postingsBS.GetCardinality(), uint64(len(io.results)))
var cardinality uint64
if postingsBS != nil {
cardinality = postingsBS.GetCardinality()
}
chunkSize, err := getChunkSize(io.chunkMode, cardinality, uint64(len(io.results)))
if err != nil {
return nil, err
}

View File

@@ -326,7 +326,7 @@ func (s *SegmentBase) loadFieldsNew() error {
if seek > uint64(len(s.mem)) {
// handling a buffer overflow case.
// a rare case where the backing buffer is not large enough to be read directly via
// a pos+binary.MaxVarinLen64 seek. For eg, this can happen when there is only
// a pos+binary.MaxVarintLen64 seek. For eg, this can happen when there is only
// one field to be indexed in the entire batch of data and while writing out
// these fields metadata, you write 1 + 8 bytes whereas the MaxVarintLen64 = 10.
seek = uint64(len(s.mem))
@@ -342,7 +342,7 @@ func (s *SegmentBase) loadFieldsNew() error {
// the following loop will be executed only once in the edge case pointed out above
// since there is only field's offset store which occupies 8 bytes.
// the pointer then seeks to a position preceding the sectionsIndexOffset, at
// which point the responbility of handling the out-of-bounds cases shifts to
// which point the responsibility of handling the out-of-bounds cases shifts to
// the specific section's parsing logic.
var fieldID uint64
for fieldID < numFields {
@@ -867,15 +867,6 @@ func (s *SegmentBase) loadDvReaders() error {
s.incrementBytesRead(read)
dataLoc, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+
"offset for sectionID %v field %v", secID, s.fieldsInv[fieldID])
}
if secID == SectionInvertedTextIndex {
s.dictLocs = append(s.dictLocs, dataLoc)
s.incrementBytesRead(uint64(n))
}
fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd)
if err != nil {
return err

View File

@@ -50,7 +50,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer,
return tw, nil
}
func persistFieldsSection(fieldsInv []string, w *CountHashWriter, dictLocs []uint64, opaque map[int]resetable) (uint64, error) {
func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) {
var rv uint64
fieldsOffsets := make([]uint64, 0, len(fieldsInv))

18
vendor/modules.txt vendored
View File

@@ -160,7 +160,7 @@ github.com/bitly/go-simplejson
# github.com/bits-and-blooms/bitset v1.12.0
## explicit; go 1.16
github.com/bits-and-blooms/bitset
# github.com/blevesearch/bleve/v2 v2.4.2
# github.com/blevesearch/bleve/v2 v2.4.3
## explicit; go 1.21
github.com/blevesearch/bleve/v2
github.com/blevesearch/bleve/v2/analysis
@@ -202,15 +202,15 @@ github.com/blevesearch/bleve/v2/search/scorer
github.com/blevesearch/bleve/v2/search/searcher
github.com/blevesearch/bleve/v2/size
github.com/blevesearch/bleve/v2/util
# github.com/blevesearch/bleve_index_api v1.1.10
# github.com/blevesearch/bleve_index_api v1.1.12
## explicit; go 1.20
github.com/blevesearch/bleve_index_api
# github.com/blevesearch/geo v0.1.20
## explicit; go 1.18
github.com/blevesearch/geo/geojson
github.com/blevesearch/geo/s2
# github.com/blevesearch/go-faiss v1.0.20
## explicit; go 1.19
# github.com/blevesearch/go-faiss v1.0.23
## explicit; go 1.21
github.com/blevesearch/go-faiss
# github.com/blevesearch/go-porterstemmer v1.0.3
## explicit; go 1.13
@@ -221,8 +221,8 @@ github.com/blevesearch/gtreap
# github.com/blevesearch/mmap-go v1.0.4
## explicit; go 1.13
github.com/blevesearch/mmap-go
# github.com/blevesearch/scorch_segment_api/v2 v2.2.15
## explicit; go 1.20
# github.com/blevesearch/scorch_segment_api/v2 v2.2.16
## explicit; go 1.21
github.com/blevesearch/scorch_segment_api/v2
# github.com/blevesearch/segment v0.9.1
## explicit; go 1.18
@@ -252,11 +252,11 @@ github.com/blevesearch/zapx/v13
# github.com/blevesearch/zapx/v14 v14.3.10
## explicit; go 1.19
github.com/blevesearch/zapx/v14
# github.com/blevesearch/zapx/v15 v15.3.13
# github.com/blevesearch/zapx/v15 v15.3.16
## explicit; go 1.19
github.com/blevesearch/zapx/v15
# github.com/blevesearch/zapx/v16 v16.1.5
## explicit; go 1.20
# github.com/blevesearch/zapx/v16 v16.1.8
## explicit; go 1.21
github.com/blevesearch/zapx/v16
# github.com/bluele/gcache v0.0.2
## explicit; go 1.15