mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
issue: https://github.com/milvus-io/milvus/issues/42148 For a vector field inside a STRUCT, since a STRUCT can only appear as the element type of an ARRAY field, the vector field in STRUCT is effectively an array of vectors, i.e. an embedding list. Milvus already supports searching embedding lists with metrics whose names start with the prefix MAX_SIM_. This PR allows Milvus to search embeddings inside an embedding list using the same metrics as normal embedding fields. Each embedding in the list is treated as an independent vector and participates in ANN search. Further, since STRUCT may contain scalar fields that are highly related to the embedding field, this PR introduces an element-level filter expression to refine search results. The grammar of the element-level filter is: element_filter(structFieldName, $[subFieldName] == 3) where $[subFieldName] refers to the value of subFieldName in each element of the STRUCT array structFieldName. It can be combined with existing filter expressions, for example: "varcharField == 'aaa' && element_filter(struct_field, $[struct_int] == 3)" A full example: ``` struct_schema = milvus_client.create_struct_field_schema() struct_schema.add_field("struct_str", DataType.VARCHAR, max_length=65535) struct_schema.add_field("struct_int", DataType.INT32) struct_schema.add_field("struct_float_vec", DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM) schema.add_field( "struct_field", datatype=DataType.ARRAY, element_type=DataType.STRUCT, struct_schema=struct_schema, max_capacity=1000, ) ... filter = "varcharField == 'aaa' && element_filter(struct_field, $[struct_int] == 3 && $[struct_str] == 'abc')" res = milvus_client.search( COLLECTION_NAME, data=query_embeddings, limit=10, anns_field="struct_field[struct_float_vec]", filter=filter, output_fields=["struct_field[struct_int]", "varcharField"], ) ``` TODO: 1. When an `element_filter` expression is used, a regular filter expression must also be present. Remove this restriction. 2. Implement `element_filter` expressions in the `query`. --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
252 lines
8.9 KiB
Go
252 lines
8.9 KiB
Go
package segments
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/util/reduce"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
type SearchReduce interface {
|
|
ReduceSearchResultData(ctx context.Context, searchResultData []*schemapb.SearchResultData, info *reduce.ResultInfo) (*schemapb.SearchResultData, error)
|
|
}
|
|
|
|
type SearchCommonReduce struct{}
|
|
|
|
func (scr *SearchCommonReduce) ReduceSearchResultData(ctx context.Context, searchResultData []*schemapb.SearchResultData, info *reduce.ResultInfo) (*schemapb.SearchResultData, error) {
|
|
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "ReduceSearchResultData")
|
|
defer sp.End()
|
|
log := log.Ctx(ctx)
|
|
|
|
if len(searchResultData) == 0 {
|
|
return &schemapb.SearchResultData{
|
|
NumQueries: info.GetNq(),
|
|
TopK: info.GetTopK(),
|
|
FieldsData: make([]*schemapb.FieldData, 0),
|
|
Scores: make([]float32, 0),
|
|
Ids: &schemapb.IDs{},
|
|
Topks: make([]int64, 0),
|
|
}, nil
|
|
}
|
|
ret := &schemapb.SearchResultData{
|
|
NumQueries: info.GetNq(),
|
|
TopK: info.GetTopK(),
|
|
FieldsData: make([]*schemapb.FieldData, len(searchResultData[0].FieldsData)),
|
|
Scores: make([]float32, 0),
|
|
Ids: &schemapb.IDs{},
|
|
Topks: make([]int64, 0),
|
|
}
|
|
|
|
// Check element-level consistency: all results must have ElementIndices or none
|
|
hasElementIndices := searchResultData[0].ElementIndices != nil
|
|
for i, data := range searchResultData {
|
|
if (data.ElementIndices != nil) != hasElementIndices {
|
|
return nil, fmt.Errorf("inconsistent element-level flag in search results: result[0] has ElementIndices=%v, but result[%d] has ElementIndices=%v",
|
|
hasElementIndices, i, data.ElementIndices != nil)
|
|
}
|
|
}
|
|
if hasElementIndices {
|
|
ret.ElementIndices = &schemapb.LongArray{
|
|
Data: make([]int64, 0),
|
|
}
|
|
}
|
|
|
|
resultOffsets := make([][]int64, len(searchResultData))
|
|
for i := 0; i < len(searchResultData); i++ {
|
|
resultOffsets[i] = make([]int64, len(searchResultData[i].Topks))
|
|
for j := int64(1); j < info.GetNq(); j++ {
|
|
resultOffsets[i][j] = resultOffsets[i][j-1] + searchResultData[i].Topks[j-1]
|
|
}
|
|
ret.AllSearchCount += searchResultData[i].GetAllSearchCount()
|
|
}
|
|
|
|
var skipDupCnt int64
|
|
var retSize int64
|
|
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
|
for i := int64(0); i < info.GetNq(); i++ {
|
|
offsets := make([]int64, len(searchResultData))
|
|
idSet := make(map[interface{}]struct{})
|
|
var j int64
|
|
for j = 0; j < info.GetTopK(); {
|
|
sel := SelectSearchResultData(searchResultData, resultOffsets, offsets, i)
|
|
if sel == -1 {
|
|
break
|
|
}
|
|
idx := resultOffsets[sel][i] + offsets[sel]
|
|
|
|
id := typeutil.GetPK(searchResultData[sel].GetIds(), idx)
|
|
score := searchResultData[sel].Scores[idx]
|
|
|
|
// remove duplicates
|
|
if _, ok := idSet[id]; !ok {
|
|
retSize += typeutil.AppendFieldData(ret.FieldsData, searchResultData[sel].FieldsData, idx)
|
|
typeutil.AppendPKs(ret.Ids, id)
|
|
ret.Scores = append(ret.Scores, score)
|
|
if searchResultData[sel].ElementIndices != nil && ret.ElementIndices != nil {
|
|
ret.ElementIndices.Data = append(ret.ElementIndices.Data, searchResultData[sel].ElementIndices.Data[idx])
|
|
}
|
|
idSet[id] = struct{}{}
|
|
j++
|
|
} else {
|
|
// skip entity with same id
|
|
skipDupCnt++
|
|
}
|
|
offsets[sel]++
|
|
}
|
|
|
|
// if realTopK != -1 && realTopK != j {
|
|
// log.Warn("Proxy Reduce Search Result", zap.Error(errors.New("the length (topk) between all result of query is different")))
|
|
// // return nil, errors.New("the length (topk) between all result of query is different")
|
|
// }
|
|
ret.Topks = append(ret.Topks, j)
|
|
|
|
// limit search result to avoid oom
|
|
if retSize > maxOutputSize {
|
|
return nil, fmt.Errorf("search results exceed the maxOutputSize Limit %d", maxOutputSize)
|
|
}
|
|
}
|
|
log.Debug("skip duplicated search result", zap.Int64("count", skipDupCnt))
|
|
return ret, nil
|
|
}
|
|
|
|
type SearchGroupByReduce struct{}
|
|
|
|
func (sbr *SearchGroupByReduce) ReduceSearchResultData(ctx context.Context, searchResultData []*schemapb.SearchResultData, info *reduce.ResultInfo) (*schemapb.SearchResultData, error) {
|
|
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "ReduceSearchResultData")
|
|
defer sp.End()
|
|
log := log.Ctx(ctx)
|
|
|
|
if len(searchResultData) == 0 {
|
|
log.Debug("Shortcut return SearchGroupByReduce, directly return empty result", zap.Any("result info", info))
|
|
return &schemapb.SearchResultData{
|
|
NumQueries: info.GetNq(),
|
|
TopK: info.GetTopK(),
|
|
FieldsData: make([]*schemapb.FieldData, 0),
|
|
Scores: make([]float32, 0),
|
|
Ids: &schemapb.IDs{},
|
|
Topks: make([]int64, 0),
|
|
}, nil
|
|
}
|
|
ret := &schemapb.SearchResultData{
|
|
NumQueries: info.GetNq(),
|
|
TopK: info.GetTopK(),
|
|
FieldsData: make([]*schemapb.FieldData, len(searchResultData[0].FieldsData)),
|
|
Scores: make([]float32, 0),
|
|
Ids: &schemapb.IDs{},
|
|
Topks: make([]int64, 0),
|
|
}
|
|
|
|
// Check element-level consistency: all results must have ElementIndices or none
|
|
hasElementIndices := searchResultData[0].ElementIndices != nil
|
|
for i, data := range searchResultData {
|
|
if (data.ElementIndices != nil) != hasElementIndices {
|
|
return nil, fmt.Errorf("inconsistent element-level flag in search results: result[0] has ElementIndices=%v, but result[%d] has ElementIndices=%v",
|
|
hasElementIndices, i, data.ElementIndices != nil)
|
|
}
|
|
}
|
|
if hasElementIndices {
|
|
ret.ElementIndices = &schemapb.LongArray{
|
|
Data: make([]int64, 0),
|
|
}
|
|
}
|
|
|
|
resultOffsets := make([][]int64, len(searchResultData))
|
|
groupByValIterator := make([]func(int) any, len(searchResultData))
|
|
for i := range searchResultData {
|
|
resultOffsets[i] = make([]int64, len(searchResultData[i].Topks))
|
|
for j := int64(1); j < info.GetNq(); j++ {
|
|
resultOffsets[i][j] = resultOffsets[i][j-1] + searchResultData[i].Topks[j-1]
|
|
}
|
|
ret.AllSearchCount += searchResultData[i].GetAllSearchCount()
|
|
groupByValIterator[i] = typeutil.GetDataIterator(searchResultData[i].GetGroupByFieldValue())
|
|
}
|
|
gpFieldBuilder, err := typeutil.NewFieldDataBuilder(searchResultData[0].GetGroupByFieldValue().GetType(), true, int(info.GetTopK()))
|
|
if err != nil {
|
|
return ret, merr.WrapErrServiceInternal("failed to construct group by field data builder, this is abnormal as segcore should always set up a group by field, no matter data status, check code on qn", err.Error())
|
|
}
|
|
|
|
var filteredCount int64
|
|
var retSize int64
|
|
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
|
groupSize := info.GetGroupSize()
|
|
if groupSize <= 0 {
|
|
groupSize = 1
|
|
}
|
|
groupBound := info.GetTopK() * groupSize
|
|
|
|
for i := int64(0); i < info.GetNq(); i++ {
|
|
offsets := make([]int64, len(searchResultData))
|
|
|
|
idSet := make(map[interface{}]struct{})
|
|
groupByValueMap := make(map[interface{}]int64)
|
|
|
|
var j int64
|
|
for j = 0; j < groupBound; {
|
|
sel := SelectSearchResultData(searchResultData, resultOffsets, offsets, i)
|
|
if sel == -1 {
|
|
break
|
|
}
|
|
idx := resultOffsets[sel][i] + offsets[sel]
|
|
|
|
id := typeutil.GetPK(searchResultData[sel].GetIds(), idx)
|
|
groupByVal := groupByValIterator[sel](int(idx))
|
|
score := searchResultData[sel].Scores[idx]
|
|
if _, ok := idSet[id]; !ok {
|
|
groupCount := groupByValueMap[groupByVal]
|
|
if groupCount == 0 && int64(len(groupByValueMap)) >= info.GetTopK() {
|
|
// exceed the limit for group count, filter this entity
|
|
filteredCount++
|
|
} else if groupCount >= groupSize {
|
|
// exceed the limit for each group, filter this entity
|
|
filteredCount++
|
|
} else {
|
|
retSize += typeutil.AppendFieldData(ret.FieldsData, searchResultData[sel].FieldsData, idx)
|
|
typeutil.AppendPKs(ret.Ids, id)
|
|
ret.Scores = append(ret.Scores, score)
|
|
if searchResultData[sel].ElementIndices != nil && ret.ElementIndices != nil {
|
|
ret.ElementIndices.Data = append(ret.ElementIndices.Data, searchResultData[sel].ElementIndices.Data[idx])
|
|
}
|
|
gpFieldBuilder.Add(groupByVal)
|
|
groupByValueMap[groupByVal] += 1
|
|
idSet[id] = struct{}{}
|
|
j++
|
|
}
|
|
} else {
|
|
// skip entity with same pk
|
|
filteredCount++
|
|
}
|
|
offsets[sel]++
|
|
}
|
|
ret.Topks = append(ret.Topks, j)
|
|
|
|
// limit search result to avoid oom
|
|
if retSize > maxOutputSize {
|
|
return nil, fmt.Errorf("search results exceed the maxOutputSize Limit %d", maxOutputSize)
|
|
}
|
|
}
|
|
ret.GroupByFieldValue = gpFieldBuilder.Build()
|
|
if float64(filteredCount) >= 0.3*float64(groupBound) {
|
|
log.Warn("GroupBy reduce filtered too many results, "+
|
|
"this may influence the final result seriously",
|
|
zap.Int64("filteredCount", filteredCount),
|
|
zap.Int64("groupBound", groupBound))
|
|
}
|
|
log.Debug("skip duplicated search result", zap.Int64("count", filteredCount))
|
|
return ret, nil
|
|
}
|
|
|
|
func InitSearchReducer(info *reduce.ResultInfo) SearchReduce {
|
|
if info.GetGroupByFieldId() > 0 {
|
|
return &SearchGroupByReduce{}
|
|
}
|
|
return &SearchCommonReduce{}
|
|
}
|